diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 33c2c8a737ad91aede25dcafd7c2a03ca6ed5015..0e9ec7bc24ba248e752c6b5693ba48f849595cee 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -586,6 +586,12 @@ struct SDelFWriter { uint8_t *aBuf[1]; }; +struct STsdbReadSnap { + SMemTable *pMem; + SMemTable *pIMem; + STsdbFS fs; +}; + struct SDataFWriter { STsdb *pTsdb; SDFileSet wSet; @@ -603,12 +609,6 @@ struct SDataFWriter { uint8_t *aBuf[4]; }; -struct STsdbReadSnap { - SMemTable *pMem; - SMemTable *pIMem; - STsdbFS fs; -}; - struct SDataFReader { STsdb *pTsdb; SDFileSet *pSet; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 3aa3aa5182110a7eb0aaa9ab3f211089d55fa501..4bbbf24c475a0998bc3a3c9802c863c32aa736ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -23,7 +23,7 @@ typedef struct { int64_t pgno; } STsdbFD; -int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { +static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { int32_t code = 0; pFD->pFD = taosOpenFile(path, opt); @@ -45,12 +45,12 @@ _exit: return code; } -void tsdbCloseFile(STsdbFD *pFD) { +static void tsdbCloseFile(STsdbFD *pFD) { taosMemoryFree(pFD->pBuf); taosCloseFile(&pFD->pFD); } -int32_t tsdbSyncFile(STsdbFD *pFD) { +static int32_t tsdbSyncFile(STsdbFD *pFD) { int32_t code = 0; if (taosFsyncFile(pFD->pFD) < 0) { @@ -62,7 +62,7 @@ _exit: return code; } -int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) { +static int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) { int32_t code = 0; int32_t n = 0; @@ -120,7 +120,7 @@ _exit: return code; } -int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { +static int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { int32_t code = 0; int64_t pgno = offset / pFD->szPage; @@ -147,1219 +147,1195 @@ _exit: return code; } -// SDataFReader ==================================================== -int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { +// SDataFWriter ==================================================== +int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { int32_t code = 0; - SDataFReader *pReader; + int32_t flag; + int64_t n; + SDataFWriter *pWriter = NULL; char fname[TSDB_FILENAME_LEN]; + char hdr[TSDB_FHDR_SIZE] = {0}; // alloc - pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader)); - if (pReader == NULL) { + pWriter = taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pReader->pTsdb = pTsdb; - pReader->pSet = pSet; + pWriter->pTsdb = pTsdb; + pWriter->wSet = (SDFileSet){ + .diskId = pSet->diskId, + .fid = pSet->fid, + .pHeadF = &pWriter->fHead, + .pDataF = &pWriter->fData, + .pSmaF = &pWriter->fSma, + .nSstF = pSet->nSstF // + }; + pWriter->fHead = *pSet->pHeadF; + pWriter->fData = *pSet->pDataF; + pWriter->fSma = *pSet->pSmaF; + for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) { + pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst]; + pWriter->fSst[iSst] = *pSet->aSstF[iSst]; + } - // open impl // head - tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pHeadFD == NULL) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); + pWriter->pHeadFD = taosOpenFile(fname, flag); + if (pWriter->pHeadFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + ASSERT(n == TSDB_FHDR_SIZE); + + pWriter->fHead.size += TSDB_FHDR_SIZE; + // data - tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pDataFD == NULL) { + if (pWriter->fData.size == 0) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + } else { + flag = TD_FILE_WRITE; + } + tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); + pWriter->pDataFD = taosOpenFile(fname, flag); + if (pWriter->pDataFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (pWriter->fData.size == 0) { + n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pWriter->fData.size += TSDB_FHDR_SIZE; + } else { + n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == pWriter->fData.size); + } // sma - tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pSmaFD == NULL) { + if (pWriter->fSma.size == 0) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + } else { + flag = TD_FILE_WRITE; + } + tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); + pWriter->pSmaFD = taosOpenFile(fname, flag); + if (pWriter->pSmaFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (pWriter->fSma.size == 0) { + n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - // sst - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); - pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ); - if (pReader->aLastFD[iSst] == NULL) { + pWriter->fSma.size += TSDB_FHDR_SIZE; + } else { + n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + + ASSERT(n == pWriter->fSma.size); } - *ppReader = pReader; + // sst + ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); + pWriter->pLastFD = taosOpenFile(fname, flag); + if (pWriter->pLastFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; + + *ppWriter = pWriter; return code; _err: - tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppReader = NULL; + tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppWriter = NULL; return code; } -int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { +int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { int32_t code = 0; - if (*ppReader == NULL) goto _exit; + STsdb *pTsdb = NULL; - // head - if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { + if (*ppWriter == NULL) goto _exit; + + pTsdb = (*ppWriter)->pTsdb; + if (sync) { + if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile((*ppWriter)->pDataFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + + if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // data - if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // sma - if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // sst - for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) { - if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) { - tFree((*ppReader)->aBuf[iBuf]); + for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) { + tFree((*ppWriter)->aBuf[iBuf]); } - taosMemoryFree(*ppReader); - + taosMemoryFree(*ppWriter); _exit: - *ppReader = NULL; + *ppWriter = NULL; return code; _err: - tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { - int32_t code = 0; - int64_t offset = pReader->pSet->pHeadF->offset; - int64_t size = pReader->pSet->pHeadF->size - offset; - int64_t n; - uint32_t delimiter; - - taosArrayClear(aBlockIdx); - if (size == 0) { - goto _exit; - } +int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { + int32_t code = 0; + int64_t n; + char hdr[TSDB_FHDR_SIZE]; - // alloc - code = tRealloc(&pReader->aBuf[0], size); - if (code) goto _err; + // head ============== + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutHeadFile(hdr, &pWriter->fHead); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - // seek - if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + n = taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // read - n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); + n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; + } + + // data ============== + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutDataFile(hdr, &pWriter->fData); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); + + n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; + n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // decode - n = 0; - n = tGetU32(pReader->aBuf[0] + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); + // sma ============== + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutSmaFile(hdr, &pWriter->fSma); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - while (n < size - sizeof(TSCKSUM)) { - SBlockIdx blockIdx; - n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx); + n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - ASSERT(n + sizeof(TSCKSUM) == size); + // sst ============== + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); + + n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } -_exit: return code; _err: - tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { - int32_t code = 0; - int64_t offset = pReader->pSet->aSstF[iSst]->offset; - int64_t size = pReader->pSet->aSstF[iSst]->size - offset; - int64_t n; - uint32_t delimiter; +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->fHead; + int64_t size = 0; + int64_t n; - taosArrayClear(aSstBlk); - if (size == 0) { + // check + if (taosArrayGetSize(aBlockIdx) == 0) { + pHeadFile->offset = pHeadFile->size; goto _exit; } + // prepare + size = sizeof(uint32_t); + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { + size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); + } + size += sizeof(TSCKSUM); + // alloc - code = tRealloc(&pReader->aBuf[0], size); + code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; - // seek - if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + // build + n = 0; + n = tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { + n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx)); } + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - // read - n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size); + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; } - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // decode - n = 0; - n = tGetU32(pReader->aBuf[0] + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - - while (n < size - sizeof(TSCKSUM)) { - SSstBlk blockl; - n += tGetSstBlk(pReader->aBuf[0] + n, &blockl); - - if (taosArrayPush(aSstBlk, &blockl) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - ASSERT(n + sizeof(TSCKSUM) == size); + // update + pHeadFile->offset = pHeadFile->size; + pHeadFile->size += size; _exit: + tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d", TD_VID(pWriter->pTsdb->pVnode), + pHeadFile->offset, size, taosArrayGetSize(aBlockIdx)); return code; _err: - tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { - int32_t code = 0; - int64_t offset = pBlockIdx->offset; - int64_t size = pBlockIdx->size; - int64_t n; - int64_t tn; +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->fHead; + int64_t size; + int64_t n; + + ASSERT(mBlock->nItem > 0); // alloc - code = tRealloc(&pReader->aBuf[0], size); + size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); + code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; - // seek - if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // build + n = 0; + n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); + n += tPutMapData(pWriter->aBuf[0] + n, mBlock); + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - // read - n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; } - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // decode - n = 0; - - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - - tn = tGetMapData(pReader->aBuf[0] + n, mBlock); - if (tn < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - n += tn; - ASSERT(n + sizeof(TSCKSUM) == size); + // update + pBlockIdx->offset = pHeadFile->size; + pBlockIdx->size = size; + pHeadFile->size += size; + tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 + " size:%" PRId64 " nItem:%d", + TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, + pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); return code; _err: - tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) { +int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { int32_t code = 0; - SSmaInfo *pSmaInfo = &pDataBlk->smaInfo; + SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; + int64_t size; + int64_t n; - ASSERT(pSmaInfo->size > 0); + // check + if (taosArrayGetSize(aSstBlk) == 0) { + pSstFile->offset = pSstFile->size; + goto _exit; + } - taosArrayClear(aColumnDataAgg); + // size + size = sizeof(uint32_t); // TSDB_FILE_DLMT + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { + size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); + } + size += sizeof(TSCKSUM); // alloc - int32_t size = pSmaInfo->size + sizeof(TSCKSUM); - code = tRealloc(&pReader->aBuf[0], size); + code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; - // seek - int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pSmaInfo->offset) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + // encode + n = 0; + n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { + n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL)); } + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - // read - n = taosReadFile(pReader->pSmaFD, pReader->aBuf[0], size); + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; } - // decode - n = 0; - while (n < pSmaInfo->size) { - SColumnDataAgg sma; - - n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma); - if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } + // update + pSstFile->offset = pSstFile->size; + pSstFile->size += size; +_exit: + tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), + pSstFile->offset, size); return code; _err: - tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, - SBlockData *pBlockData) { +static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) { int32_t code = 0; - tBlockDataClear(pBlockData); + pSmaInfo->offset = 0; + pSmaInfo->size = 0; - TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo) + // encode + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - // uid + version + tskey - code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1); - if (code) goto _err; - SDiskDataHdr hdr; - uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr); + if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue; - ASSERT(hdr.delimiter == TSDB_FILE_DLMT); - ASSERT(pBlockData->suid == hdr.suid); - ASSERT(pBlockData->uid == hdr.uid); + SColumnDataAgg sma; + tsdbCalcColDataSMA(pColData, &sma); - pBlockData->nRow = hdr.nRow; + code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma)); + if (code) goto _err; + pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma); + } - // uid - if (hdr.uid == 0) { - ASSERT(hdr.szUid); - code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, - sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - } else { - ASSERT(!hdr.szUid); - } - p += hdr.szUid; - - // version - code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, - sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - p += hdr.szVer; - - // TSKEY - code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, - sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - p += hdr.szKey; - - ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey - sizeof(TSCKSUM)); - - // read and decode columns - if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit; + // write + if (pSmaInfo->size) { + int32_t size = pSmaInfo->size + sizeof(TSCKSUM); - if (hdr.szBlkCol > 0) { - int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; - code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM), 1); + code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; - } - - SBlockCol blockCol = {.cid = 0}; - SBlockCol *pBlockCol = &blockCol; - int32_t n = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - while (pBlockCol && pBlockCol->cid < pColData->cid) { - if (n < hdr.szBlkCol) { - n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol); - } else { - ASSERT(n == hdr.szBlkCol); - pBlockCol = NULL; - } + int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) { - // add a lot of NONE - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); - if (code) goto _err; - } - } else { - ASSERT(pBlockCol->type == pColData->type); - ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - if (pBlockCol->flag == HAS_NULL) { - // add a lot of NULL - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - // decode from binary - int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset; - int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); - - code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[1], size, 0); - if (code) goto _err; - - code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]); - if (code) goto _err; - } - } + pSmaInfo->offset = pWriter->fSma.size; + pWriter->fSma.size += size; } -_exit: return code; _err: - tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, + int8_t cmprAlg, int8_t toLast) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData); - if (code) goto _err; - - if (pDataBlk->nSubBlock > 1) { - SBlockData bData1; - SBlockData bData2; + ASSERT(pBlockData->nRow > 0); - // create - code = tBlockDataCreate(&bData1); - if (code) goto _err; - code = tBlockDataCreate(&bData2); - if (code) goto _err; + pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size; + pBlkInfo->szBlock = 0; + pBlkInfo->szKey = 0; - // init - tBlockDataInitEx(&bData1, pBlockData); - tBlockDataInitEx(&bData2, pBlockData); + int32_t aBufN[4] = {0}; + code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN); + if (code) goto _err; - for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } + // write ================= + TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; - code = tBlockDataCopy(pBlockData, &bData2); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } + pBlkInfo->szKey = aBufN[3] + aBufN[2]; + pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; - code = tBlockDataMerge(&bData1, &bData2, pBlockData); - if (code) { - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); - goto _err; - } - } + int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], aBufN[3]); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - tBlockDataDestroy(&bData1, 1); - tBlockDataDestroy(&bData2, 1); + n = taosWriteFile(pFD, pWriter->aBuf[2], aBufN[2]); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - return code; + if (aBufN[1]) { + n = taosWriteFile(pFD, pWriter->aBuf[1], aBufN[1]); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } -_err: - tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} + if (aBufN[0]) { + n = taosWriteFile(pFD, pWriter->aBuf[0], aBufN[0]); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } -int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { - int32_t code = 0; + // update info + if (toLast) { + pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock; + } else { + pWriter->fData.size += pBlkInfo->szBlock; + } - code = tsdbReadBlockDataImpl(pReader, &pSstBlk->bInfo, 1, pBlockData); - if (code) goto _err; + // ================= SMA ==================== + if (pSmaInfo) { + code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo); + if (code) goto _err; + } +_exit: + tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d", + TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset, + pBlkInfo->szBlock); return code; _err: - tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { - int32_t code = 0; - - // read - code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0); - if (code) goto _exit; - - // decmpr - code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); - if (code) goto _exit; - -_exit: - return code; -} +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { + int32_t code = 0; + int64_t n; + int64_t size; + TdFilePtr pOutFD = NULL; // TODO + TdFilePtr PInFD = NULL; // TODO + char fNameFrom[TSDB_FILENAME_LEN]; + char fNameTo[TSDB_FILENAME_LEN]; -// SDataFWriter ==================================================== -int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { - int32_t code = 0; - int32_t flag; - int64_t n; - SDataFWriter *pWriter = NULL; - char fname[TSDB_FILENAME_LEN]; - char hdr[TSDB_FHDR_SIZE] = {0}; + // head + tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); + tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo); - // alloc - pWriter = taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pWriter->pTsdb = pTsdb; - pWriter->wSet = (SDFileSet){ - .diskId = pSet->diskId, - .fid = pSet->fid, - .pHeadF = &pWriter->fHead, - .pDataF = &pWriter->fData, - .pSmaF = &pWriter->fSma, - .nSstF = pSet->nSstF // - }; - pWriter->fHead = *pSet->pHeadF; - pWriter->fData = *pSet->pDataF; - pWriter->fSma = *pSet->pSmaF; - for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) { - pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst]; - pWriter->fSst[iSst] = *pSet->aSstF[iSst]; - } - // head - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); - pWriter->pHeadFD = taosOpenFile(fname, flag); - if (pWriter->pHeadFD == NULL) { + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - - ASSERT(n == TSDB_FHDR_SIZE); - - pWriter->fHead.size += TSDB_FHDR_SIZE; + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); // data - if (pWriter->fData.size == 0) { - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - } else { - flag = TD_FILE_WRITE; - } - tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); - pWriter->pDataFD = taosOpenFile(fname, flag); - if (pWriter->pDataFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - if (pWriter->fData.size == 0) { - n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pWriter->fData.size += TSDB_FHDR_SIZE; - } else { - n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(n == pWriter->fData.size); - } + tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom); + tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo); - // sma - if (pWriter->fSma.size == 0) { - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - } else { - flag = TD_FILE_WRITE; - } - tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); - pWriter->pSmaFD = taosOpenFile(fname, flag); - if (pWriter->pSmaFD == NULL) { + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (pWriter->fSma.size == 0) { - n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pWriter->fSma.size += TSDB_FHDR_SIZE; - } else { - n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - ASSERT(n == pWriter->fSma.size); - } - - // sst - ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); - pWriter->pLastFD = taosOpenFile(fname, flag); - if (pWriter->pLastFD == NULL) { + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; - - *ppWriter = pWriter; - return code; - -_err: - tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; -} - -int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { - int32_t code = 0; - STsdb *pTsdb = NULL; - - if (*ppWriter == NULL) goto _exit; - - pTsdb = (*ppWriter)->pTsdb; - if (sync) { - if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosFsyncFile((*ppWriter)->pDataFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); - if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // sst + tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom); + tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo); - if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) { + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) { + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size); + if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); - if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { + // sma + tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom); + tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo); + + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { + PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); + if (PInFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) { - tFree((*ppWriter)->aBuf[iBuf]); + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - taosMemoryFree(*ppWriter); -_exit: - *ppWriter = NULL; + taosCloseFile(&pOutFD); + taosCloseFile(&PInFD); + return code; _err: - tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { - int32_t code = 0; - int64_t n; - char hdr[TSDB_FHDR_SIZE]; - - // head ============== - memset(hdr, 0, TSDB_FHDR_SIZE); - tPutHeadFile(hdr, &pWriter->fHead); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); +// SDataFReader ==================================================== +int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + SDataFReader *pReader; + char fname[TSDB_FILENAME_LEN]; - n = taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + // alloc + pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + pReader->pTsdb = pTsdb; + pReader->pSet = pSet; - n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { + // open impl + // head + tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); + pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pHeadFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // data ============== - memset(hdr, 0, TSDB_FHDR_SIZE); - tPutDataFile(hdr, &pWriter->fData); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - - n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET); - if (n < 0) { + // data + tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); + pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pDataFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { + // sma + tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); + pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pSmaFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // sma ============== - memset(hdr, 0, TSDB_FHDR_SIZE); - tPutSmaFile(hdr, &pWriter->fSma); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); + // sst + for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { + tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); + pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ); + if (pReader->aLastFD[iSst] == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } - n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET); - if (n < 0) { + *ppReader = pReader; + return code; + +_err: + tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { + int32_t code = 0; + if (*ppReader == NULL) goto _exit; + + // head + if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { + // data + if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - // sst ============== - memset(hdr, 0, TSDB_FHDR_SIZE); - tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - - n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); - if (n < 0) { + // sma + if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + // sst + for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) { + if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + + for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) { + tFree((*ppReader)->aBuf[iBuf]); } + taosMemoryFree(*ppReader); +_exit: + *ppReader = NULL; return code; _err: - tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { - int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->fHead; - int64_t size = 0; - int64_t n; +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { + int32_t code = 0; + int64_t offset = pReader->pSet->pHeadF->offset; + int64_t size = pReader->pSet->pHeadF->size - offset; + int64_t n; + uint32_t delimiter; - // check - if (taosArrayGetSize(aBlockIdx) == 0) { - pHeadFile->offset = pHeadFile->size; + taosArrayClear(aBlockIdx); + if (size == 0) { goto _exit; } - // prepare - size = sizeof(uint32_t); - for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { - size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); - } - size += sizeof(TSCKSUM); - // alloc - code = tRealloc(&pWriter->aBuf[0], size); + code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // build - n = 0; - n = tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { - n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx)); + // seek + if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); - // write - n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); + // read + n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = 0; + n = tGetU32(pReader->aBuf[0] + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); + + while (n < size - sizeof(TSCKSUM)) { + SBlockIdx blockIdx; + n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx); + + if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } - // update - pHeadFile->offset = pHeadFile->size; - pHeadFile->size += size; + ASSERT(n + sizeof(TSCKSUM) == size); _exit: - tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d", TD_VID(pWriter->pTsdb->pVnode), - pHeadFile->offset, size, taosArrayGetSize(aBlockIdx)); return code; _err: - tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->fHead; - int64_t size; - int64_t n; +int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { + int32_t code = 0; + int64_t offset = pReader->pSet->aSstF[iSst]->offset; + int64_t size = pReader->pSet->aSstF[iSst]->size - offset; + int64_t n; + uint32_t delimiter; - ASSERT(mBlock->nItem > 0); + taosArrayClear(aSstBlk); + if (size == 0) { + goto _exit; + } // alloc - size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); - code = tRealloc(&pWriter->aBuf[0], size); + code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // build - n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - n += tPutMapData(pWriter->aBuf[0] + n, mBlock); - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); + // seek + if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - // write - n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size); + // read + n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } - // update - pBlockIdx->offset = pHeadFile->size; - pBlockIdx->size = size; - pHeadFile->size += size; + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } - tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 - " size:%" PRId64 " nItem:%d", - TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, - pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); + // decode + n = 0; + n = tGetU32(pReader->aBuf[0] + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); + + while (n < size - sizeof(TSCKSUM)) { + SSstBlk blockl; + n += tGetSstBlk(pReader->aBuf[0] + n, &blockl); + + if (taosArrayPush(aSstBlk, &blockl) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + ASSERT(n + sizeof(TSCKSUM) == size); + +_exit: return code; _err: - tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { - int32_t code = 0; - SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; - int64_t size; - int64_t n; - - // check - if (taosArrayGetSize(aSstBlk) == 0) { - pSstFile->offset = pSstFile->size; - goto _exit; - } - - // size - size = sizeof(uint32_t); // TSDB_FILE_DLMT - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { - size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); - } - size += sizeof(TSCKSUM); +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { + int32_t code = 0; + int64_t offset = pBlockIdx->offset; + int64_t size = pBlockIdx->size; + int64_t n; + int64_t tn; // alloc - code = tRealloc(&pWriter->aBuf[0], size); + code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // encode - n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { - n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL)); + // seek + if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); - // write - n = taosWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size); + // read + n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } - // update - pSstFile->offset = pSstFile->size; - pSstFile->size += size; - -_exit: - tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), - pSstFile->offset, size); - return code; + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } -_err: - tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - return code; -} + // decode + n = 0; -static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SDataBlk *pDataBlk) { - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; + uint32_t delimiter; + n += tGetU32(pReader->aBuf[0] + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); - if (iRow == 0) { - if (tsdbKeyCmprFn(&pDataBlk->minKey, &key) > 0) { - pDataBlk->minKey = key; - } - } else { - if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { - pDataBlk->hasDup = 1; - } - } + tn = tGetMapData(pReader->aBuf[0] + n, mBlock); + if (tn < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + n += tn; + ASSERT(n + sizeof(TSCKSUM) == size); - if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pDataBlk->maxKey, &key) < 0) { - pDataBlk->maxKey = key; - } + return code; - pDataBlk->minVer = TMIN(pDataBlk->minVer, key.version); - pDataBlk->maxVer = TMAX(pDataBlk->maxVer, key.version); - } - pDataBlk->nRow += pBlockData->nRow; +_err: + tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; } -static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) { - int32_t code = 0; - - pSmaInfo->offset = 0; - pSmaInfo->size = 0; +int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) { + int32_t code = 0; + SSmaInfo *pSmaInfo = &pDataBlk->smaInfo; - // encode - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + ASSERT(pSmaInfo->size > 0); - if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue; + taosArrayClear(aColumnDataAgg); - SColumnDataAgg sma; - tsdbCalcColDataSMA(pColData, &sma); + // alloc + int32_t size = pSmaInfo->size + sizeof(TSCKSUM); + code = tRealloc(&pReader->aBuf[0], size); + if (code) goto _err; - code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma)); - if (code) goto _err; - pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma); + // seek + int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < pSmaInfo->offset) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } - // write - if (pSmaInfo->size) { - int32_t size = pSmaInfo->size + sizeof(TSCKSUM); + // read + n = taosReadFile(pReader->pSmaFD, pReader->aBuf[0], size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } - code = tRealloc(&pWriter->aBuf[0], size); - if (code) goto _err; + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); + // decode + n = 0; + while (n < pSmaInfo->size) { + SColumnDataAgg sma; - int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma); + if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - - pSmaInfo->offset = pWriter->fSma.size; - pWriter->fSma.size += size; } return code; _err: - tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, - int8_t cmprAlg, int8_t toLast) { +static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, + SBlockData *pBlockData) { int32_t code = 0; - ASSERT(pBlockData->nRow > 0); + tBlockDataClear(pBlockData); + + TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo) + + // uid + version + tskey + code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1); + if (code) goto _err; + SDiskDataHdr hdr; + uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr); + + ASSERT(hdr.delimiter == TSDB_FILE_DLMT); + ASSERT(pBlockData->suid == hdr.suid); + ASSERT(pBlockData->uid == hdr.uid); + + pBlockData->nRow = hdr.nRow; + + // uid + if (hdr.uid == 0) { + ASSERT(hdr.szUid); + code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, + sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); + if (code) goto _err; + } else { + ASSERT(!hdr.szUid); + } + p += hdr.szUid; - pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size; - pBlkInfo->szBlock = 0; - pBlkInfo->szKey = 0; + // version + code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, + sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); + if (code) goto _err; + p += hdr.szVer; - int32_t aBufN[4] = {0}; - code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN); + // TSKEY + code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, + sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]); if (code) goto _err; + p += hdr.szKey; - // write ================= - TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey - sizeof(TSCKSUM)); - pBlkInfo->szKey = aBufN[3] + aBufN[2]; - pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; + // read and decode columns + if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit; - int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], aBufN[3]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + if (hdr.szBlkCol > 0) { + int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; + code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM), 1); + if (code) goto _err; } - n = taosWriteFile(pFD, pWriter->aBuf[2], aBufN[2]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + SBlockCol blockCol = {.cid = 0}; + SBlockCol *pBlockCol = &blockCol; + int32_t n = 0; - if (aBufN[1]) { - n = taosWriteFile(pFD, pWriter->aBuf[1], aBufN[1]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - } + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - if (aBufN[0]) { - n = taosWriteFile(pFD, pWriter->aBuf[0], aBufN[0]); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + while (pBlockCol && pBlockCol->cid < pColData->cid) { + if (n < hdr.szBlkCol) { + n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol); + } else { + ASSERT(n == hdr.szBlkCol); + pBlockCol = NULL; + } } - } - // update info - if (toLast) { - pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock; - } else { - pWriter->fData.size += pBlkInfo->szBlock; - } + if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) { + // add a lot of NONE + for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _err; + } + } else { + ASSERT(pBlockCol->type == pColData->type); + ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - // ================= SMA ==================== - if (pSmaInfo) { - code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo); - if (code) goto _err; + if (pBlockCol->flag == HAS_NULL) { + // add a lot of NULL + for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + } else { + // decode from binary + int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset; + int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); + + code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[1], size, 0); + if (code) goto _err; + + code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]); + if (code) goto _err; + } + } } _exit: - tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d", - TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset, - pBlkInfo->szBlock); return code; _err: - tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { - int32_t code = 0; - int64_t n; - int64_t size; - TdFilePtr pOutFD = NULL; // TODO - TdFilePtr PInFD = NULL; // TODO - char fNameFrom[TSDB_FILENAME_LEN]; - char fNameTo[TSDB_FILENAME_LEN]; +int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { + int32_t code = 0; - // head - tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); - tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo); + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData); + if (code) goto _err; - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (pDataBlk->nSubBlock > 1) { + SBlockData bData1; + SBlockData bData2; - PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); - if (PInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // create + code = tBlockDataCreate(&bData1); + if (code) goto _err; + code = tBlockDataCreate(&bData2); + if (code) goto _err; - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - taosCloseFile(&pOutFD); - taosCloseFile(&PInFD); + // init + tBlockDataInitEx(&bData1, pBlockData); + tBlockDataInitEx(&bData2, pBlockData); - // data - tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom); - tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo); + for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1); + if (code) { + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); + goto _err; + } - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tBlockDataCopy(pBlockData, &bData2); + if (code) { + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); + goto _err; + } - PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); - if (PInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tBlockDataMerge(&bData1, &bData2, pBlockData); + if (code) { + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); + goto _err; + } + } - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + tBlockDataDestroy(&bData1, 1); + tBlockDataDestroy(&bData2, 1); } - taosCloseFile(&pOutFD); - taosCloseFile(&PInFD); - // sst - tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom); - tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo); + return code; - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } +_err: + tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} - PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); - if (PInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } +int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { + int32_t code = 0; - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - taosCloseFile(&pOutFD); - taosCloseFile(&PInFD); + code = tsdbReadBlockDataImpl(pReader, &pSstBlk->bInfo, 1, pBlockData); + if (code) goto _err; - // sma - tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom); - tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo); + return code; - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } +_err: + tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} - PInFD = taosOpenFile(fNameFrom, TD_FILE_READ); - if (PInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } +int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { + int32_t code = 0; - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - taosCloseFile(&pOutFD); - taosCloseFile(&PInFD); + // read + code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0); + if (code) goto _exit; - return code; + // decmpr + code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); + if (code) goto _exit; -_err: - tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: return code; } @@ -1558,7 +1534,6 @@ _err: tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } - // SDelFReader ==================================================== struct SDelFReader { STsdb *pTsdb;