提交 7e1306a9 编写于 作者: H Hongze Cheng

more code

上级 b85bde9a
......@@ -72,6 +72,7 @@ typedef struct SLDataIter SLDataIter;
#define TSDB_MAX_SST_FILE 16
#define TSDB_DEFAULT_SST_FILE 8
#define TSDB_FHDR_SIZE 512
#define TSDB_DEFAULT_PAGE_SIZE 4096
#define HAS_NONE ((int8_t)0x1)
#define HAS_NULL ((int8_t)0x2)
......@@ -578,11 +579,20 @@ struct SRowMerger {
SArray *pArray; // SArray<SColVal>
};
typedef struct {
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
} STsdbFD;
struct SDelFWriter {
STsdb *pTsdb;
SDelFile fDel;
TdFilePtr pWriteH;
STsdbFD *pWriteH;
uint8_t *aBuf[1];
};
......@@ -592,16 +602,6 @@ struct STsdbReadSnap {
STsdbFS fs;
};
typedef struct {
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
} STsdbFD;
struct SDataFWriter {
STsdb *pTsdb;
SDFileSet wSet;
......
......@@ -15,12 +15,17 @@
#include "tsdb.h"
#define LOGIC_TO_FILE_SIZE(LSIZE, PAGE) (0) // todo
// =================================================================================================
static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
int32_t n = 0;
int8_t hasDel = pFS->pDelFile ? 1 : 0;
uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
// version
n += tPutI8(p ? p + n : p, 0);
// SDelFile
n += tPutI8(p ? p + n : p, hasDel);
if (hasDel) {
......@@ -292,7 +297,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != pSet->pHeadF->size) {
if (size != LOGIC_TO_FILE_SIZE(pSet->pHeadF->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......@@ -303,10 +308,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < pSet->pDataF->size) {
if (size < LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > pSet->pDataF->size) {
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
if (code) goto _err;
}
......@@ -317,10 +322,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < pSet->pSmaF->size) {
if (size < LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > pSet->pSmaF->size) {
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
if (code) goto _err;
}
......@@ -332,7 +337,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != pSet->aSstF[iSst]->size) {
if (size != LOGIC_TO_FILE_SIZE(pSet->aSstF[iSst]->size, TSDB_DEFAULT_PAGE_SIZE)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......@@ -364,10 +369,12 @@ static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) {
int32_t code = 0;
int8_t hasDel;
uint32_t nSet;
int32_t n;
int32_t n = 0;
// version
n += tGetI8(pData + n, NULL);
// SDelFile
n = 0;
n += tGetI8(pData + n, &hasDel);
if (hasDel) {
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
......
......@@ -15,8 +15,6 @@
#include "tsdb.h"
#define TSDB_DEFAULT_PAGE_SIZE 4096
// =============== PAGE-WISE FILE ===============
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(OFFSET, PAGE) \
......@@ -1137,7 +1135,7 @@ _err:
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
int32_t code = 0;
char fname[TSDB_FILENAME_LEN];
char hdr[TSDB_FHDR_SIZE] = {0};
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
SDelFWriter *pDelFWriter;
int64_t n;
......@@ -1151,18 +1149,13 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
pDelFWriter->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE);
if (pDelFWriter->pWriteH == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code =
tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
if (code) goto _err;
// update header
n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
pDelFWriter->fDel.offset = 0;
......@@ -1182,16 +1175,13 @@ int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
STsdb *pTsdb = pWriter->pTsdb;
// sync
if (sync && taosFsyncFile(pWriter->pWriteH) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
if (sync) {
code = tsdbFsyncFile(pWriter->pWriteH);
if (code) goto _err;
}
// close
if (taosCloseFile(&pWriter->pWriteH) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbCloseFile(&pWriter->pWriteH);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
tFree(pWriter->aBuf[iBuf]);
......@@ -1212,11 +1202,10 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelId
int64_t n;
// prepare
size = sizeof(uint32_t);
size = 0;
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(&pWriter->aBuf[0], size);
......@@ -1224,22 +1213,14 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelId
// build
n = 0;
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
}
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
ASSERT(n + sizeof(TSCKSUM) == size);
ASSERT(n == size);
// write
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == size);
code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
if (code) goto _err;
// update
pDelIdx->offset = pWriter->fDel.size;
......@@ -1260,11 +1241,10 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
SDelIdx *pDelIdx;
// prepare
size = sizeof(uint32_t);
size = 0;
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(&pWriter->aBuf[0], size);
......@@ -1272,20 +1252,14 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
// build
n = 0;
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
}
taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
ASSERT(n + sizeof(TSCKSUM) == size);
ASSERT(n == size);
// write
n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
if (code) goto _err;
// update
pWriter->fDel.offset = pWriter->fDel.size;
......@@ -1300,27 +1274,16 @@ _err:
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
int32_t code = 0;
char hdr[TSDB_FHDR_SIZE];
char hdr[TSDB_FHDR_SIZE] = {0};
int64_t size = TSDB_FHDR_SIZE;
int64_t n;
// build
memset(hdr, 0, size);
tPutDelFile(hdr, &pWriter->fDel);
taosCalcChecksumAppend(0, hdr, size);
// seek
if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pWriteH, hdr, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbWriteFile(pWriter->pWriteH, 0, hdr, size);
if (code) goto _err;
return code;
......@@ -1332,8 +1295,7 @@ _err:
struct SDelFReader {
STsdb *pTsdb;
SDelFile fDel;
TdFilePtr pReadH;
STsdbFD *pReadH;
uint8_t *aBuf[1];
};
......@@ -1355,14 +1317,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
pDelFReader->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ);
if (pDelFReader->pReadH == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pDelFReader);
goto _err;
}
code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ, &pDelFReader->pReadH);
if (code) goto _err;
_exit:
*ppReader = pDelFReader;
return code;
......@@ -1377,10 +1334,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
SDelFReader *pReader = *ppReader;
if (pReader) {
if (taosCloseFile(&pReader->pReadH) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
tsdbCloseFile(&pReader->pReadH);
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
tFree(pReader->aBuf[iBuf]);
}
......@@ -1400,38 +1354,17 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
taosArrayClear(aDelData);
// seek
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// alloc
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// read
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
if (code) goto _err;
// // decode
n = 0;
uint32_t delimiter;
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
while (n < size - sizeof(TSCKSUM)) {
while (n < size) {
SDelData delData;
n += tGetDelData(pReader->aBuf[0] + n, &delData);
......@@ -1440,8 +1373,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
goto _err;
}
}
ASSERT(n == size - sizeof(TSCKSUM));
ASSERT(n == size);
return code;
......@@ -1458,39 +1390,17 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
taosArrayClear(aDelIdx);
// seek
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// alloc
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// read
n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
if (code) goto _err;
// decode
n = 0;
uint32_t delimiter;
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) {
while (n < size) {
SDelIdx delIdx;
n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
......@@ -1501,7 +1411,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
}
}
ASSERT(n == size - sizeof(TSCKSUM));
ASSERT(n == size);
return code;
......@@ -1509,37 +1419,3 @@ _err:
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
static int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
int32_t code = 0;
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// seek
int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD, *ppOut, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
// check
if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册