diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index a2899ead8e4db65197c82fe491caf1335bae1eca..4a3ce2db86b01a3c7134bed68b389bd7d3b97b7c 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -63,7 +63,7 @@ extern "C" { typedef struct { col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) int32_t type : 8; // column type - int32_t bytes : 24; // column bytes (restore to int32_t in case of misuse) + int32_t bytes : 24; // column bytes (0~16M) int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ... int32_t offset : 24; // point offset in STpRow after the header part. } STColumn; @@ -81,12 +81,12 @@ typedef struct { // ----------------- TSDB SCHEMA DEFINITION typedef struct { - int32_t version; // version - int32_t numOfCols; // Number of columns appended - int32_t tlen; // maximum length of a STpRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + - // (bytes)) - uint16_t flen; // First part length in a STpRow after the header part - uint16_t vlen; // pure value part length, excluded the overhead (bytes only) + int32_t numOfCols; // Number of columns appended + schema_ver_t version; // schema version + uint16_t flen; // First part length in a STpRow after the header part + int32_t vlen; // pure value part length, excluded the overhead (bytes only) + int32_t tlen; // maximum length of a STpRow without the header part + // (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes)) STColumn columns[]; } STSchema; @@ -120,13 +120,13 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { // ----------------- SCHEMA BUILDER DEFINITION typedef struct { - int32_t tCols; - int32_t nCols; - int32_t tlen; - uint16_t flen; - uint16_t vlen; - int32_t version; - STColumn *columns; + int32_t tCols; + int32_t nCols; + schema_ver_t version; + uint16_t flen; + int32_t vlen; + int32_t tlen; + STColumn *columns; } STSchemaBuilder; #define TD_VTYPE_BITS 2 // val type @@ -136,9 +136,9 @@ typedef struct { #define TD_BITMAP_BYTES(cnt) (ceil((double)cnt / TD_VTYPE_PARTS)) #define TD_BIT_TO_BYTES(cnt) (ceil((double)cnt / 8)) -int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); +int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); -void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); +void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version); int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 87dc752703473f38ee773f11ebc46a4817747293..19442af206a7f5feb7c0bb8e9721d440dc40a478 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -30,6 +30,7 @@ typedef uint8_t TDRowValT; typedef int16_t col_id_t; typedef int8_t col_type_t; typedef int32_t col_bytes_t; +typedef uint16_t schema_ver_t; #pragma pack(push, 1) typedef struct { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 1b7157c49cb0c5e5bc387f61ecaf04dfea4a961b..7fd66e95ad3f0194a1b66953cf0952695e7820d4 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -123,7 +123,7 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) { return buf; } -int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { +int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { if (pBuilder == NULL) return -1; pBuilder->tCols = 256; @@ -140,7 +140,7 @@ void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) { } } -void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { +void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { pBuilder->nCols = 0; pBuilder->tlen = 0; pBuilder->flen = 0; @@ -168,6 +168,9 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); } + // TODO: set sma value by user input + pCol->sma = 1; + if (IS_VAR_DATA_TYPE(type)) { colSetBytes(pCol, bytes); pBuilder->tlen += (TYPE_BYTES[type] + bytes); diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index cd24358b27d756f76e58a1121c50e3303eae426f..17c220a35a30b97b4989cf7bd6dc1c84643e937a 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -67,12 +67,13 @@ typedef struct { uint8_t last : 1; uint8_t blkVer : 7; uint8_t numOfSubBlocks; - int16_t numOfCols; // not including timestamp column + col_id_t numOfCols; // not including timestamp column uint32_t len; // data block length - uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + uint32_t algorithm : 4; uint32_t reserve : 8; - int32_t algorithm : 8; - int32_t numOfRows : 24; + col_id_t numOfBSma; + uint16_t numOfRows; int64_t offset; uint64_t aggrStat : 1; uint64_t aggrOffset : 63; @@ -80,7 +81,7 @@ typedef struct { TSKEY keyLast; } SBlockV0; -#define SBlock SBlockV0 // latest SBlock definition +#define SBlock SBlockV0 // latest SBlock definition #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index eb8df610515f2c48ca84a55bf003e1fd80208321..3e0b03f3318ea5f10e224da58bd842404df3e9ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -43,20 +43,20 @@ typedef struct { #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) -#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) -#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) -#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) -#define TSDB_COMMIT_TABLE(ch) ((ch)->pTable) -#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) -#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) -#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) -#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) -#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) -#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) -#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) -#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) +#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) +#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) +#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) +#define TSDB_COMMIT_TABLE(ch) ((ch)->pTable) +#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) +#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) +#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) +#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) +#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) +#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) +#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) +#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) -#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) +#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static void tsdbStartCommit(STsdb *pRepo); static void tsdbEndCommit(STsdb *pTsdb, int eno); @@ -1204,9 +1204,10 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { - STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlockData * pBlockData = NULL; + STsdbCfg *pCfg = REPO_CFG(pRepo); + SBlockData *pBlockData = NULL; SAggrBlkData *pAggrBlkData = NULL; + STSchema *pSchema = pTable->pSchema; int64_t offset = 0, offsetAggr = 0; int rowsToWrite = pDataCols->numOfRows; @@ -1225,10 +1226,12 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF pAggrBlkData = (SAggrBlkData *)(*ppExBuf); // Get # of cols not all NULL(not including key column) - int nColsNotAllNull = 0; + col_id_t nColsNotAllNull = 0; + col_id_t nColsOfBlockSma = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column - SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull; + STColumn *pColumn = pSchema->columns + ncol; + SDataCol *pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull; if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it @@ -1260,7 +1263,12 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF } else { TD_SET_COL_ROWS_MISC(pBlockCol); } - nColsNotAllNull++; + + ++nColsNotAllNull; + + if (pColumn->sma) { + ++nColsOfBlockSma; + } } ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); @@ -1357,9 +1365,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF return -1; } - uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0; + uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0; if (aggrStatus > 0) { - taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); @@ -1378,6 +1385,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF pBlock->keyLen = keyLen; pBlock->numOfSubBlocks = isSuper ? 1 : 0; pBlock->numOfCols = nColsNotAllNull; + pBlock->numOfBSma = nColsOfBlockSma; pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyLast = dataColsKeyLast(pDataCols); pBlock->aggrStat = aggrStatus; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 9619ac036efa63a3b2aaa64855ca2f52600f0a72..304b3286fe31e2c796c7f68cd0f98df6f4741a87 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -321,7 +321,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { return -1; } - size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); + size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfBSma, (uint32_t)pBlock->blkVer); if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1; int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 9b1bb61d070a424b43bd7cd46253dd81f1a17d35..c220e6001fc43a4886d5b973af5542e62989474c 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -189,12 +189,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA -#if 0 +#if 1 + SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno)); return -1; } + vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName, + vCreateSmaReq.tSma.indexUid); // record current timezone of server side tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 29a4b7f552a06e3f195a066916489c78ded47a66..d010ea4437e948553e8b1cd0d3272c063269eb2e 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -330,7 +330,6 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); // step 2: insert data - STSmaDataWrapper *pSmaData = NULL; STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); STsdbCfg *pCfg = &pTsdb->config; @@ -416,6 +415,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { col_id_t numOfCols = 4096; ASSERT_GT(numOfCols, 0); +#if 0 + STSmaDataWrapper *pSmaData = NULL; pSmaData = (STSmaDataWrapper *)buf; printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); pSmaData->skey = skey1; @@ -459,9 +460,13 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { pSmaData->dataLen = (len - sizeof(STSmaDataWrapper)); ASSERT_GE(bufSize, pSmaData->dataLen); - // execute ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); +#endif + + SSDataBlock *pSmaData = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + + // step 3: query uint32_t checkDataCnt = 0;