diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bb08a345c285f50e69258f5c61c4535af7a9ba19..bd24b45ccd36de2a84c27b682e803c0549cee76f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -183,6 +183,8 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *m int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now); int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline); void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg); +int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); +int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); @@ -238,9 +240,9 @@ int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); -int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf); +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); +int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf); @@ -248,9 +250,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); -int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf); -int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf); -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); +int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL); +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, @@ -605,6 +607,9 @@ struct SDataFWriter { SLastFile fLast; SSmaFile fSma; + uint8_t *pBuf1; + uint8_t *pBuf2; + SDiskData dData; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f03b02af27bcef4a0eed50d49ae597d20ba29f18..1fc5eae13ac282231af8429e0eacf300448843d4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -465,13 +465,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (code) goto _err; // tMapDataReset(&state->blockIdxMap); - // code = tsdbReadBlockIdx(state->pDataFReader, &state->blockIdxMap, NULL); if (!state->aBlockIdx) { state->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); } else { taosArrayClear(state->aBlockIdx); } - code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx, NULL); + code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx); if (code) goto _err; /* if (state->pBlockIdx) { */ @@ -487,8 +486,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } tMapDataReset(&state->blockMap); - code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap, NULL); - /* code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL); */ + code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap); if (code) goto _err; state->nBlock = state->blockMap.nItem; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index aa76ab1e27d5b27eac102fa7edce7041e29c756f..5470e434006007b6099666c3f74514d02e26025b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -339,7 +339,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); + code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _exit; ASSERT(pCommitter->dReader.mBlock.nItem > 0); @@ -370,7 +370,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { if (code) goto _err; // data - code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL); + code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx); if (code) goto _err; pCommitter->dReader.iBlockIdx = 0; @@ -378,8 +378,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - code = - tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); + code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; @@ -387,7 +386,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { tBlockDataReset(&pCommitter->dReader.bData); // last - code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); + code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL); if (code) goto _err; pCommitter->dReader.iBlockL = -1; @@ -506,7 +505,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { tBlockReset(&block); // as a new block } - // statistic + // info block.nRow += pBlockData->nRow; for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; @@ -532,7 +531,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { // write block.nSubBlock++; code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1], - ((block.nSubBlock == 1) && block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0, + ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0, NULL); if (code) goto _err; @@ -557,7 +556,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ASSERT(pBlockData->nRow > 0); - // statistic + // info blockL.suid = pBlockData->suid; blockL.nRow = pBlockData->nRow; blockL.minVer = VERSION_MAX; @@ -1041,7 +1040,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { // end if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid}; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, &blockIdx); + code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { @@ -1067,11 +1066,11 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; // write aBlockIdx - code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL); + code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); if (code) goto _err; // write aBlockL - code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL); + code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL); if (code) goto _err; // update file header @@ -1107,7 +1106,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break; SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, NULL, &blockIdx); + code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 84ecaa605d50b040d87cc7c6bad4f0e0c801bf38..df61d3d4c9186b4d14a749f3507db0c7e59c703e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -545,7 +545,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx)); int64_t st = taosGetTimestampUs(); - int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); + int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx); if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -617,7 +617,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); tMapDataReset(&pScanInfo->mapData); - tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL); + tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); size += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index c542ab5583db163aeec2d3aae1d8319866a55ab6..4db03338905b4144bd128a7407e79c02a3bd079d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -441,6 +441,9 @@ struct SDataFReader { TdFilePtr pDataFD; TdFilePtr pLastFD; TdFilePtr pSmaFD; + + uint8_t *pBuf1; + uint8_t *pBuf2; }; int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { @@ -523,6 +526,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { goto _err; } + tFree((*ppReader)->pBuf1); + tFree((*ppReader)->pBuf2); taosMemoryFree(*ppReader); _exit: @@ -534,14 +539,12 @@ _err: return code; } -int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pReader->pSet->pHeadF->offset; - int64_t size = pReader->pSet->pHeadF->loffset - offset; - uint8_t *pBuf = NULL; - int64_t n; - uint32_t delimiter; - SBlockIdx blockIdx; +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { + int32_t code = 0; + int64_t offset = pReader->pSet->pHeadF->offset; + int64_t size = pReader->pSet->pHeadF->loffset - offset; + int64_t n; + uint32_t delimiter; taosArrayClear(aBlockIdx); if (size == 0) { @@ -549,8 +552,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB } // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tRealloc(ppBuf, size); + code = tRealloc(&pReader->pBuf1, size); if (code) goto _err; // seek @@ -560,7 +562,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB } // read - n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -570,18 +572,19 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB } // check - if (!taosCheckChecksumWhole(*ppBuf, size)) { + if (!taosCheckChecksumWhole(pReader->pBuf1, size)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } // decode n = 0; - n = tGetU32(*ppBuf + n, &delimiter); + n = tGetU32(pReader->pBuf1 + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); while (n < size - sizeof(TSCKSUM)) { - n += tGetBlockIdx(*ppBuf + n, &blockIdx); + SBlockIdx blockIdx; + n += tGetBlockIdx(pReader->pBuf1 + n, &blockIdx); if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -591,24 +594,20 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB ASSERT(n + sizeof(TSCKSUM) == size); - tFree(pBuf); _exit: return code; _err: tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf); return code; } -int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) { +int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) { int32_t code = 0; int64_t offset = pReader->pSet->pHeadF->loffset; int64_t size = pReader->pSet->pHeadF->size - offset; int64_t n; uint32_t delimiter; - uint8_t *pBuf = NULL; - SBlockL blockl; taosArrayClear(aBlockL); if (size == 0) { @@ -616,8 +615,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) } // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tRealloc(ppBuf, size); + code = tRealloc(&pReader->pBuf1, size); if (code) goto _err; // seek @@ -627,7 +625,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) } // read - n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -637,18 +635,19 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) } // check - if (!taosCheckChecksumWhole(*ppBuf, size)) { + if (!taosCheckChecksumWhole(pReader->pBuf1, size)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } // decode n = 0; - n = tGetU32(*ppBuf + n, &delimiter); + n = tGetU32(pReader->pBuf1 + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); while (n < size - sizeof(TSCKSUM)) { - n += tGetBlockL(*ppBuf + n, &blockl); + SBlockL blockl; + n += tGetBlockL(pReader->pBuf1 + n, &blockl); if (taosArrayPush(aBlockL, &blockl) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -658,29 +657,23 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) ASSERT(n + sizeof(TSCKSUM) == size); - tFree(pBuf); _exit: return code; _err: tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf); return code; } -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pBlockIdx->offset; - int64_t size = pBlockIdx->size; - uint8_t *pBuf = NULL; - int64_t n; - int64_t tn; - SDiskDataHdr hdr; - - if (!ppBuf) ppBuf = &pBuf; +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { + int32_t code = 0; + int64_t offset = pBlockIdx->offset; + int64_t size = pBlockIdx->size; + int64_t n; + int64_t tn; // alloc - code = tRealloc(ppBuf, size); + code = tRealloc(&pReader->pBuf1, size); if (code) goto _err; // seek @@ -690,7 +683,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl } // read - n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -700,19 +693,19 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl } // check - if (!taosCheckChecksumWhole(*ppBuf, size)) { + if (!taosCheckChecksumWhole(pReader->pBuf1, size)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } // decode - hdr = *(SDiskDataHdr *)(*ppBuf); - ASSERT(hdr.delimiter == TSDB_FILE_DLMT); - ASSERT(hdr.suid == pBlockIdx->suid); - ASSERT(hdr.uid == pBlockIdx->uid); + n = 0; + + uint32_t delimiter; + n += tGetU32(pReader->pBuf1 + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); - n = sizeof(hdr); - tn = tGetMapData(*ppBuf + n, mBlock); + tn = tGetMapData(pReader->pBuf1 + n, mBlock); if (tn < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -720,12 +713,10 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl n += tn; ASSERT(n + sizeof(TSCKSUM) == size); - tFree(pBuf); return code; _err: tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf); return code; } @@ -1642,6 +1633,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { } tDiskDataClear(&(*ppWriter)->dData); + tFree((*ppWriter)->pBuf1); + tFree((*ppWriter)->pBuf2); taosMemoryFree(*ppWriter); _exit: *ppWriter = NULL; @@ -1732,10 +1725,9 @@ _err: return code; } -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf) { +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { int32_t code = 0; SHeadFile *pHeadFile = &pWriter->fHead; - uint8_t *pBuf = NULL; int64_t size; int64_t n; @@ -1746,29 +1738,28 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp } // prepare - size = tPutU32(NULL, TSDB_FILE_DLMT); + size = sizeof(uint32_t); for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); } size += sizeof(TSCKSUM); // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tRealloc(ppBuf, size); + code = tRealloc(&pWriter->pBuf1, size); if (code) goto _err; // build n = 0; - n = tPutU32(*ppBuf + n, TSDB_FILE_DLMT); + n = tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT); for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { - n += tPutBlockIdx(*ppBuf + n, taosArrayGet(aBlockIdx, iBlockIdx)); + n += tPutBlockIdx(pWriter->pBuf1 + n, taosArrayGet(aBlockIdx, iBlockIdx)); } - taosCalcChecksumAppend(0, *ppBuf, size); + taosCalcChecksumAppend(0, pWriter->pBuf1, size); ASSERT(n + sizeof(TSCKSUM) == size); // write - n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1779,44 +1770,36 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp pHeadFile->size += size; _exit: - tFree(pBuf); return code; _err: tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf); return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->fHead; - SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - uint8_t *pBuf = NULL; - int64_t size; - int64_t n; +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->fHead; + int64_t size; + int64_t n; ASSERT(mBlock->nItem > 0); - // prepare - size = sizeof(SDiskDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); - // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tRealloc(ppBuf, size); + size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); + code = tRealloc(&pWriter->pBuf1, size); if (code) goto _err; // build n = 0; - *(SDiskDataHdr *)(*ppBuf) = hdr; - n += sizeof(hdr); - n += tPutMapData(*ppBuf + n, mBlock); - taosCalcChecksumAppend(0, *ppBuf, size); + n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT); + n += tPutMapData(pWriter->pBuf1 + n, mBlock); + taosCalcChecksumAppend(0, pWriter->pBuf1, size); ASSERT(n + sizeof(TSCKSUM) == size); // write - n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1827,21 +1810,18 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, pBlockIdx->size = size; pHeadFile->size += size; - tFree(pBuf); tsdbTrace("vgId:%d, write block, offset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pBlockIdx->offset, pBlockIdx->size); return code; _err: - tFree(pBuf); tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) { +int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t code = 0; SHeadFile *pHeadFile = &pWriter->fHead; - uint8_t *pBuf = NULL; int64_t size; int64_t n; @@ -1852,29 +1832,28 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) } // size - size = sizeof(uint32_t); + size = sizeof(uint32_t); // TSDB_FILE_DLMT for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL)); } size += sizeof(TSCKSUM); // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tRealloc(ppBuf, size); + code = tRealloc(&pWriter->pBuf1, size); if (code) goto _err; // encode n = 0; - n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); + n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT); for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { - n += tPutBlockL(*ppBuf + n, taosArrayGet(aBlockL, iBlockL)); + n += tPutBlockL(pWriter->pBuf1 + n, taosArrayGet(aBlockL, iBlockL)); } - taosCalcChecksumAppend(0, *ppBuf, size); + taosCalcChecksumAppend(0, pWriter->pBuf1, size); ASSERT(n + sizeof(TSCKSUM) == size); // write - n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -1885,12 +1864,10 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) pHeadFile->size += size; _exit: - tFree(pBuf); return code; _err: tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf); return code; } @@ -1918,47 +1895,49 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { pBlock->nRow += pBlockData->nRow; } -static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t **ppBuf) { - int32_t code = 0; - int64_t n; - SColData *pColData; +static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) { + int32_t code = 0; - // prepare - pSubBlock->nSma = 0; + pSmaInfo->offset = 0; + pSmaInfo->size = 0; + + // encode for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { - pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + + if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue; - if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; + SColumnDataAgg sma; + tsdbCalcColDataSMA(pColData, &sma); - pSubBlock->nSma++; + code = tRealloc(&pWriter->pBuf1, pSmaInfo->size + tPutColumnDataAgg(NULL, &sma)); + if (code) goto _err; + pSmaInfo->size += tPutColumnDataAgg(pWriter->pBuf1 + pSmaInfo->size, &sma); } - if (pSubBlock->nSma == 0) goto _exit; - // calc - code = tRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); - if (code) goto _err; - n = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { - pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + // write + if (pSmaInfo->size) { + int32_t size = pSmaInfo->size + sizeof(TSCKSUM); - if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; + code = tRealloc(&pWriter->pBuf1, size); + if (code) goto _err; - tsdbCalcColDataSMA(pColData, &((SColumnDataAgg *)(*ppBuf))[n]); - n++; - } - taosCalcChecksumAppend(0, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); + taosCalcChecksumAppend(0, pWriter->pBuf1, size); - // write - n = taosWriteFile(pFD, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->pBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pSmaInfo->offset = pWriter->fSma.size; + pWriter->fSma.size += size; } -_exit: return code; _err: + tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -2036,12 +2015,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { } -// ================= SMA ==================== -_write_sma: - if (toLast) goto _exit; - if (pSmaInfo == NULL) goto _exit; - - // TODO + // ================= SMA ==================== + if (pSmaInfo) { + code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo); + if (code) goto _err; + } _exit: tFree(pBuf); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index eec7a3e4a66a85ebf3b66ee67e2687f8f2ee1df9..2b5ea030b2f8780e99552d0a5e342a154d35f6c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -58,7 +58,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { if (code) goto _err; // SBlockIdx - code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL); + code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); if (code) goto _err; pReader->iBlockIdx = 0; @@ -78,7 +78,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); pReader->iBlockIdx++; - code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL); + code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); if (code) goto _err; pReader->iBlock = 0; @@ -552,7 +552,7 @@ _err: static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { int32_t code = 0; - code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); + code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); if (code) goto _err; // SBlockData @@ -791,7 +791,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { } if (pWriter->pBlockIdx) { - code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL); + code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); if (code) goto _err; } else { tMapDataReset(&pWriter->mBlock); @@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { pWriter->iBlockIdx++; } - code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); + code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); if (code) goto _err; code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); @@ -897,7 +897,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; - code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL); + code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (code) goto _err; } else { ASSERT(pWriter->pDataFReader == NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 72d9175078f37cebcafcf9ef2b9f31c8e28d39a7..0ec922f20efb7753893bc0fc763a47ae443bd973 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -152,25 +152,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { return 0; } -// TSDBKEY ====================================================== -static FORCE_INLINE int32_t tPutTSDBKEY(uint8_t *p, TSDBKEY *pKey) { - int32_t n = 0; - - n += tPutI64v(p ? p + n : p, pKey->version); - n += tPutI64(p ? p + n : p, pKey->ts); - - return n; -} - -static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) { - int32_t n = 0; - - n += tGetI64v(p + n, &pKey->version); - n += tGetI64(p + n, &pKey->ts); - - return n; -} - // SBlockIdx ====================================================== int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; @@ -225,9 +206,9 @@ int32_t tPutBlock(uint8_t *p, void *ph) { SBlock *pBlock = (SBlock *)ph; n += tPutI64v(p ? p + n : p, pBlock->minKey.version); - n += tPutI64(p ? p + n : p, pBlock->minKey.ts); + n += tPutI64v(p ? p + n : p, pBlock->minKey.ts); n += tPutI64v(p ? p + n : p, pBlock->maxKey.version); - n += tPutI64(p ? p + n : p, pBlock->maxKey.ts); + n += tPutI64v(p ? p + n : p, pBlock->maxKey.ts); n += tPutI64v(p ? p + n : p, pBlock->minVer); n += tPutI64v(p ? p + n : p, pBlock->maxVer); n += tPutI32v(p ? p + n : p, pBlock->nRow); @@ -251,9 +232,9 @@ int32_t tGetBlock(uint8_t *p, void *ph) { SBlock *pBlock = (SBlock *)ph; n += tGetI64v(p + n, &pBlock->minKey.version); - n += tGetI64(p + n, &pBlock->minKey.ts); + n += tGetI64v(p + n, &pBlock->minKey.ts); n += tGetI64v(p + n, &pBlock->maxKey.version); - n += tGetI64(p + n, &pBlock->maxKey.ts); + n += tGetI64v(p + n, &pBlock->maxKey.ts); n += tGetI64v(p + n, &pBlock->minVer); n += tGetI64v(p + n, &pBlock->maxVer); n += tGetI32v(p + n, &pBlock->nRow); @@ -1777,6 +1758,30 @@ int32_t tGetDiskDataHdr(uint8_t *p, void *ph) { } // ALGORITHM ============================== +int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) { + int32_t n = 0; + + n += tPutI16v(p ? p + n : p, pColAgg->colId); + n += tPutI16v(p ? p + n : p, pColAgg->numOfNull); + n += tPutI64(p ? p + n : p, pColAgg->sum); + n += tPutI64(p ? p + n : p, pColAgg->max); + n += tPutI64(p ? p + n : p, pColAgg->min); + + return n; +} + +int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) { + int32_t n = 0; + + n += tGetI16v(p + n, &pColAgg->colId); + n += tGetI16v(p + n, &pColAgg->numOfNull); + n += tGetI64(p + n, &pColAgg->sum); + n += tGetI64(p + n, &pColAgg->max); + n += tGetI64(p + n, &pColAgg->min); + + return n; +} + void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal; SColVal *pColVal = &colVal;