提交 ebfadc24 编写于 作者: H Hongze Cheng

optimize read small tsdb page

上级 e214f93d
......@@ -621,13 +621,14 @@ struct SRowMerger {
};
typedef struct {
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
const char *path;
int32_t oflag;
int32_t pageSize;
int32_t nCachePage;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
} STsdbFD;
struct SDelFWriter {
......
......@@ -16,65 +16,65 @@
#include "tsdb.h"
// =============== PAGE-WISE FILE ===============
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD = NULL;
*ppFD = NULL;
static int32_t tsdbOpenFile(const char *path, int32_t pageSize, int32_t oflag, STsdbFD **ppFD) {
int32_t code = 0;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
if (pFD == NULL) {
ppFD[0] = (STsdbFD *)taosMemoryCalloc(1, sizeof(*ppFD[0]) + strlen(path) + 1);
if (ppFD[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
return code;
}
strcpy((char *)(ppFD[0] + 1), path);
ppFD[0]->path = (char *)(ppFD[0] + 1);
ppFD[0]->oflag = oflag;
ppFD[0]->pageSize = pageSize;
if (oflag == TD_FILE_READ) {
ppFD[0]->nCachePage = 16;
} else {
ppFD[0]->nCachePage = 1;
}
pFD->path = (char *)&pFD[1];
strcpy(pFD->path, path);
pFD->szPage = szPage;
pFD->flag = flag;
pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) {
ppFD[0]->pFD = taosOpenFile(path, oflag);
if (ppFD[0]->pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD);
goto _exit;
taosMemoryFreeClear(ppFD[0]);
return code;
}
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->pBuf = taosMemoryCalloc(1, szPage);
if (pFD->pBuf == NULL) {
ppFD[0]->pgno = 0;
ppFD[0]->pBuf = taosMemoryCalloc(ppFD[0]->nCachePage, pageSize);
if (ppFD[0]->pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD);
goto _exit;
taosCloseFile(&ppFD[0]->pFD);
taosMemoryFree(ppFD[0]);
return code;
}
// not check file size when reading data files.
if (flag != TD_FILE_READ) {
if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD);
goto _exit;
}
if (oflag == TD_FILE_READ) return 0;
ASSERT(pFD->szFile % szPage == 0);
pFD->szFile = pFD->szFile / szPage;
if (taosStatFile(path, &ppFD[0]->szFile, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(ppFD[0]->pBuf);
taosCloseFile(&ppFD[0]->pFD);
taosMemoryFree(ppFD[0]);
return code;
}
*ppFD = pFD;
ASSERT(ppFD[0]->szFile % pageSize == 0);
ppFD[0]->szFile = ppFD[0]->szFile / pageSize;
_exit:
return code;
return 0;
}
static void tsdbCloseFile(STsdbFD **ppFD) {
STsdbFD *pFD = *ppFD;
if (pFD) {
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD);
*ppFD = NULL;
if (ppFD[0]) {
taosMemoryFree(ppFD[0]->pBuf);
taosCloseFile(&ppFD[0]->pFD);
taosMemoryFreeClear(ppFD[0]);
}
}
......@@ -82,15 +82,15 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
int32_t code = 0;
if (pFD->pgno > 0) {
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->pageSize), SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
taosCalcChecksumAppend(0, pFD->pBuf, pFD->pageSize);
n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->pageSize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
......@@ -112,7 +112,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
// ASSERT(pgno <= pFD->szFile);
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t offset = PAGE_OFFSET(pgno, pFD->pageSize);
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -120,19 +120,22 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
}
// read
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->pageSize * pFD->nCachePage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < pFD->szPage) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
ASSERT(n % pFD->pageSize == 0);
// check
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
if (pgno > 1) {
for (int32_t i = 0; i < n / pFD->pageSize; ++i) {
if (!taosCheckChecksumWhole(pFD->pBuf + i * pFD->pageSize, pFD->pageSize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
}
}
pFD->pgno = pgno;
......@@ -143,9 +146,9 @@ _exit:
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
int64_t bOffset = fOffset % pFD->szPage;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->pageSize);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->pageSize);
int64_t bOffset = fOffset % pFD->pageSize;
int64_t n = 0;
do {
......@@ -161,7 +164,7 @@ static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf,
}
}
int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->pageSize) - bOffset, size - n);
memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
pgno++;
......@@ -176,22 +179,24 @@ _exit:
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t n = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
int64_t bOffset = fOffset % pFD->szPage;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->pageSize);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->pageSize);
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->pageSize);
int64_t bOffset = fOffset % pFD->pageSize;
// ASSERT(pgno && pgno <= pFD->szFile);
ASSERT(bOffset < szPgCont);
while (n < size) {
if (pFD->pgno != pgno) {
if (pFD->pgno == 0 //
|| (pgno < pFD->pgno //
|| pgno >= pFD->pgno + pFD->nCachePage)) {
code = tsdbReadFilePage(pFD, pgno);
if (code) goto _exit;
}
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
memcpy(pBuf + n, pFD->pBuf + pFD->pageSize * (pgno - pFD->pgno) + bOffset, nRead);
n += nRead;
pgno++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册