diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7546b0943e5642317a735626b01a2e0c393e2751..98a3ee9fdb2b871995c1306ea70d8af8049fd8ce 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -633,9 +633,10 @@ typedef struct SMergeTree { struct SLDataIter *pIter; } SMergeTree; -int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange); -void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree *pMTree); +int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid, + STimeWindow *pTimeWindow, SVersionRange *pVerRange); +void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); +bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index dbaf9b234c5a7bca7b41c2f5ca943234c05e129f..a969a3c080a915007e08e6cdac4a54a248a0fdb2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1608,3 +1608,135 @@ _err: tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } + +// =============== PAGE-WISE FILE =============== +typedef struct { + TdFilePtr pFD; + int32_t szPage; + int32_t nBuf; + uint8_t *pBuf; + int64_t pgno; +} STsdbFD; + +int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { + int32_t code = 0; + + pFD->pFD = taosOpenFile(path, opt); + if (pFD->pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + pFD->szPage = 4096; + pFD->nBuf = 0; + pFD->pBuf = taosMemoryMalloc(pFD->szPage); + if (pFD->pBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + return code; +} + +void tsdbCloseFile(STsdbFD *pFD) { + taosMemoryFree(pFD->pBuf); + taosCloseFile(&pFD->pFD); +} + +int32_t tsdbSyncFile(STsdbFD *pFD) { + int32_t code = 0; + + if (taosFsyncFile(pFD->pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + +_exit: + return code; +} + +int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf) { + int32_t code = 0; + + int32_t n = 0; + while (n < nBuf) { + int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM); + int32_t size = TMIN(remain, nBuf - n); + + memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size); + n += size; + pFD->nBuf += size; + + if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) { + taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage); + + int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + pFD->nBuf = 0; + } + } + +_exit: + return code; +} + +static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { + int32_t code = 0; + + int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < pFD->szPage) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + +_exit: + return code; +} + +int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { + int32_t code = 0; + + int64_t pgno = offset / pFD->szPage; + int64_t n = 0; + if (pFD->pgno == pgno) { + int64_t bOff = offset % pFD->szPage; + int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count); + memcpy(pBuf + n, pFD->pBuf + bOff, nRead); + n = nRead; + } + + while (n < count) { + code = tsdbReadFilePage(pFD, pgno); + if (code) goto _exit; + + pgno++; + + int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n); + memcpy(pBuf + n, pFD->pBuf, nRead); + n += nRead; + } + +_exit: + return code; +} \ No newline at end of file