提交 3e55e72a 编写于 作者: H Hongze Cheng

more code

上级 870c016d
......@@ -196,7 +196,6 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf);
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
// tsdbMemTable ==============================================================================================
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......@@ -605,10 +604,10 @@ struct SDataFWriter {
STsdb *pTsdb;
SDFileSet wSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pSmaFD;
TdFilePtr pLastFD;
STsdbFD *pHeadFD;
STsdbFD *pDataFD;
STsdbFD *pSmaFD;
STsdbFD *pLastFD;
SHeadFile fHead;
SDataFile fData;
......@@ -621,10 +620,10 @@ struct SDataFWriter {
struct SDataFReader {
STsdb *pTsdb;
SDFileSet *pSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pSmaFD;
TdFilePtr aLastFD[TSDB_MAX_SST_FILE];
STsdbFD *pHeadFD;
STsdbFD *pDataFD;
STsdbFD *pSmaFD;
STsdbFD *aLastFD[TSDB_MAX_SST_FILE];
uint8_t *aBuf[3];
};
......
......@@ -33,11 +33,9 @@ struct SLDataIter {
SVersionRange verRange;
};
static SBlockData* getCurrentBlock(SLDataIter* pIter) {
return &pIter->bData[pIter->loadIndex];
}
static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; }
static SBlockData* getNextBlock(SLDataIter* pIter) {
static SBlockData *getNextBlock(SLDataIter *pIter) {
pIter->loadIndex ^= 1;
return getCurrentBlock(pIter);
}
......@@ -150,9 +148,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
static void findNextValidRow(SLDataIter *pIter) {
int32_t step = pIter->backward ? -1 : 1;
bool hasVal = false;
int32_t i = pIter->iRow;
SBlockData* pBlockData = getCurrentBlock(pIter);
bool hasVal = false;
int32_t i = pIter->iRow;
SBlockData *pBlockData = getCurrentBlock(pIter);
for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) {
......@@ -220,8 +218,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
return false;
}
int32_t iBlockL = pIter->iSstBlk;
SBlockData* pBlockData = getCurrentBlock(pIter);
int32_t iBlockL = pIter->iSstBlk;
SBlockData *pBlockData = getCurrentBlock(pIter);
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
pBlockData = getNextBlock(pIter);
......@@ -306,7 +304,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0};
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -15,9 +15,17 @@
#include "tsdb.h"
// =============== PAGE-WISE FILE ===============
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t opt, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD;
static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) {
int32_t code = 0;
*ppFD = NULL;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD));
if (pFD == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pFD->pFD = taosOpenFile(path, opt);
if (pFD->pFD == NULL) {
......@@ -25,7 +33,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) {
goto _exit;
}
pFD->szPage = 4096;
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->nBuf = 0;
pFD->pBuf = taosMemoryMalloc(pFD->szPage);
......@@ -33,17 +41,21 @@ static int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*ppFD = pFD;
_exit:
return code;
}
static void tsdbCloseFile(STsdbFD *pFD) {
static void tsdbCloseFile(STsdbFD **ppFD) {
STsdbFD *pFD = *ppFD;
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD);
*ppFD = NULL;
}
static int32_t tsdbSyncFile(STsdbFD *pFD) {
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0;
if (taosFsyncFile(pFD->pFD) < 0) {
......@@ -140,11 +152,18 @@ _exit:
return code;
}
static int32_t tsdbLSeekFile(STsdbFD *pFD, int64_t offset) {
int32_t code = 0;
ASSERT(0);
return code;
}
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
int32_t flag;
int64_t n;
int32_t szPage = 4096;
SDataFWriter *pWriter = NULL;
char fname[TSDB_FILENAME_LEN];
char hdr[TSDB_FHDR_SIZE] = {0};
......@@ -156,14 +175,12 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto _err;
}
pWriter->pTsdb = pTsdb;
pWriter->wSet = (SDFileSet){
.diskId = pSet->diskId,
.fid = pSet->fid,
.pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData,
.pSmaF = &pWriter->fSma,
.nSstF = pSet->nSstF //
};
pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
.fid = pSet->fid,
.pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData,
.pSmaF = &pWriter->fSma,
.nSstF = pSet->nSstF};
pWriter->fHead = *pSet->pHeadF;
pWriter->fData = *pSet->pDataF;
pWriter->fSma = *pSet->pSmaF;
......@@ -173,19 +190,13 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
}
// head
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
pWriter->pHeadFD = taosOpenFile(fname, flag);
if (pWriter->pHeadFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
if (code) goto _err;
n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
ASSERT(n == TSDB_FHDR_SIZE);
......@@ -193,78 +204,49 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
// data
if (pWriter->fData.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
pWriter->pDataFD = taosOpenFile(fname, flag);
if (pWriter->pDataFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
if (code) goto _err;
if (pWriter->fData.size == 0) {
n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
pWriter->fData.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pWriter->fData.size);
// code = tsdbLSeekFile(pWriter->pDataFD, 0, SEEK_END);
// if (code) goto _err;
}
// sma
if (pWriter->fSma.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
pWriter->pSmaFD = taosOpenFile(fname, flag);
if (pWriter->pSmaFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
if (code) goto _err;
if (pWriter->fSma.size == 0) {
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
pWriter->fSma.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pWriter->fSma.size);
code = tsdbLSeekFile(pWriter->pSmaFD, 0);
if (code) goto _err;
}
// sst
ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
pWriter->pLastFD = taosOpenFile(fname, flag);
if (pWriter->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pLastFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;
*ppWriter = pWriter;
......@@ -284,46 +266,31 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
pTsdb = (*ppWriter)->pTsdb;
if (sync) {
if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) {
if (tsdbFsyncFile((*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile((*ppWriter)->pDataFD) < 0) {
if (tsdbFsyncFile((*ppWriter)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) {
if (tsdbFsyncFile((*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile((*ppWriter)->pLastFD) < 0) {
if (tsdbFsyncFile((*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
if (taosCloseFile(&(*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&(*ppWriter)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&(*ppWriter)->pHeadFD);
tsdbCloseFile(&(*ppWriter)->pDataFD);
tsdbCloseFile(&(*ppWriter)->pSmaFD);
tsdbCloseFile(&(*ppWriter)->pLastFD);
for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
tFree((*ppWriter)->aBuf[iBuf]);
......@@ -346,70 +313,42 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
// head ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutHeadFile(hdr, &pWriter->fHead);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbLSeekFile(pWriter->pHeadFD, 0);
if (code) goto _err;
n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
// data ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutDataFile(hdr, &pWriter->fData);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbLSeekFile(pWriter->pDataFD, 0);
if (code) goto _err;
n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
// sma ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutSmaFile(hdr, &pWriter->fSma);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbLSeekFile(pWriter->pSmaFD, 0);
if (code) goto _err;
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
// sst ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbLSeekFile(pWriter->pLastFD, 0);
if (code) goto _err;
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
if (code) goto _err;
return code;
......@@ -431,11 +370,9 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
}
// prepare
size = sizeof(uint32_t);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(&pWriter->aBuf[0], size);
......@@ -443,20 +380,14 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
// build
n = 0;
n = tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx));
}
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
ASSERT(n + sizeof(TSCKSUM) == size);
ASSERT(n == size);
// write
n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
if (code) goto _err;
// update
pHeadFile->offset = pHeadFile->size;
......@@ -481,24 +412,16 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
ASSERT(mBlock->nItem > 0);
// alloc
size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
size = tPutMapData(NULL, mBlock);
code = tRealloc(&pWriter->aBuf[0], size);
if (code) goto _err;
// build
n = 0;
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
n += tPutMapData(pWriter->aBuf[0] + n, mBlock);
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
ASSERT(n + sizeof(TSCKSUM) == size);
n = tPutMapData(pWriter->aBuf[0] + n, mBlock);
// write
n = taosWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
if (code) goto _err;
// update
pBlockIdx->offset = pHeadFile->size;
......@@ -519,7 +442,7 @@ _err:
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
int32_t code = 0;
SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
int64_t size;
int64_t size = 0;
int64_t n;
// check
......@@ -529,11 +452,9 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
}
// size
size = sizeof(uint32_t); // TSDB_FILE_DLMT
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(&pWriter->aBuf[0], size);
......@@ -541,20 +462,13 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
// encode
n = 0;
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
}
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size, NULL);
if (code) goto _err;
// update
pSstFile->offset = pSstFile->size;
......@@ -592,21 +506,14 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
// write
if (pSmaInfo->size) {
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
code = tRealloc(&pWriter->aBuf[0], size);
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], pSmaInfo->size, NULL);
if (code) goto _err;
pSmaInfo->offset = pWriter->fSma.size;
pWriter->fSma.size += size;
// pWriter->fSma.size += size;
}
return code;
......@@ -631,37 +538,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
if (code) goto _err;
// write =================
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
STsdbFD *pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
pBlkInfo->szKey = aBufN[3] + aBufN[2];
pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], aBufN[3]);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pFD, pWriter->aBuf[3], aBufN[3], NULL);
if (code) goto _err;
n = taosWriteFile(pFD, pWriter->aBuf[2], aBufN[2]);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pFD, pWriter->aBuf[2], aBufN[2], NULL);
if (code) goto _err;
if (aBufN[1]) {
n = taosWriteFile(pFD, pWriter->aBuf[1], aBufN[1]);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pFD, pWriter->aBuf[1], aBufN[1], NULL);
if (code) goto _err;
}
if (aBufN[0]) {
n = taosWriteFile(pFD, pWriter->aBuf[0], aBufN[0]);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pFD, pWriter->aBuf[0], aBufN[0], NULL);
if (code) goto _err;
}
// update info
......@@ -804,6 +699,7 @@ _err:
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
SDataFReader *pReader;
int32_t szPage = 4096;
char fname[TSDB_FILENAME_LEN];
// alloc
......@@ -818,36 +714,24 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
// open impl
// head
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pHeadFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
if (code) goto _err;
// data
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pDataFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
if (code) goto _err;
// sma
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pSmaFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
if (code) goto _err;
// sst
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ);
if (pReader->aLastFD[iSst] == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aLastFD[iSst]);
if (code) goto _err;
}
*ppReader = pReader;
......@@ -864,29 +748,17 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
if (*ppReader == NULL) goto _exit;
// head
if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&(*ppReader)->pHeadFD);
// data
if (taosCloseFile(&(*ppReader)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&(*ppReader)->pDataFD);
// sma
if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&(*ppReader)->pSmaFD);
// sst
for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) {
if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&(*ppReader)->aLastFD[iSst]);
}
for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
......@@ -919,14 +791,14 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// seek
if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// // seek
// if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size);
n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -982,14 +854,14 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// seek
if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// // seek
// if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size);
n = tsdbReadFile(pReader->aLastFD[iSst], offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1040,14 +912,14 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// seek
if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// // seek
// if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = taosReadFile(pReader->pHeadFD, pReader->aBuf[0], size);
n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1097,18 +969,18 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// seek
int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSmaInfo->offset) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// // seek
// int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET);
// if (n < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// } else if (n < pSmaInfo->offset) {
// code = TSDB_CODE_FILE_CORRUPTED;
// goto _err;
// }
// read
n = taosReadFile(pReader->pSmaFD, pReader->aBuf[0], size);
int64_t n = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1148,11 +1020,12 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
tBlockDataClear(pBlockData);
TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo)
STsdbFD *pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo)
// todo: realloc pReader->aBuf[0]
// uid + version + tskey
code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1);
if (code) goto _err;
tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey); // todo
SDiskDataHdr hdr;
uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);
......@@ -1192,8 +1065,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
if (hdr.szBlkCol > 0) {
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM), 1);
if (code) goto _err;
tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM));
}
SBlockCol blockCol = {.cid = 0};
......@@ -1233,8 +1105,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset;
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
code = tsdbReadAndCheck(pFD, offset, &pReader->aBuf[1], size, 0);
if (code) goto _err;
tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
if (code) goto _err;
......@@ -1321,8 +1192,7 @@ int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk
int32_t code = 0;
// read
code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0);
if (code) goto _exit;
tsdbReadFile(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
// decmpr
code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
......@@ -1708,3 +1578,37 @@ _err:
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
int32_t code = 0;
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// seek
int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD, *ppOut, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
// check
if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
\ No newline at end of file
......@@ -2153,37 +2153,3 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
_exit:
return code;
}
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
int32_t code = 0;
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// seek
int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD, *ppOut, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
// check
if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册