提交 b85bde9a 编写于 作者: H Hongze Cheng

more code

上级 3b0008a6
......@@ -598,8 +598,8 @@ typedef struct {
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
int32_t nBuf;
uint8_t *pBuf;
int64_t szFile;
} STsdbFD;
struct SDataFWriter {
......@@ -609,7 +609,7 @@ struct SDataFWriter {
STsdbFD *pHeadFD;
STsdbFD *pDataFD;
STsdbFD *pSmaFD;
STsdbFD *pLastFD;
STsdbFD *pSstFD;
SHeadFile fHead;
SDataFile fData;
......
......@@ -48,13 +48,18 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
}
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->nBuf = 0;
pFD->pBuf = taosMemoryMalloc(szPage);
if (pFD->pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pFD);
goto _exit;
}
if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
ASSERT(pFD->szFile % szPage == 0);
pFD->szFile = pFD->szFile / szPage;
*ppFD = pFD;
_exit:
......@@ -69,42 +74,29 @@ static void tsdbCloseFile(STsdbFD **ppFD) {
*ppFD = NULL;
}
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
int32_t code = 0;
if (taosFsyncFile(pFD->pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
_exit:
return code;
}
static int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) {
int32_t code = 0;
int32_t n = 0;
while (n < nBuf) {
int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM);
int32_t size = TMIN(remain, nBuf - n);
memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size);
n += size;
pFD->nBuf += size;
if (pFD->pgno > 0) {
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) {
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
pFD->nBuf = 0;
if (pFD->szFile < pFD->pgno) {
pFD->szFile = pFD->szFile;
}
}
pFD->pgno = 0;
_exit:
return code;
......@@ -113,6 +105,8 @@ _exit:
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
int32_t code = 0;
ASSERT(pgno <= pFD->szFile);
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
......@@ -143,6 +137,38 @@ _exit:
return code;
}
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
int64_t bOffset = fOffset % pFD->szPage;
int64_t n = 0;
do {
if (pFD->pgno != pgno) {
code = tsdbWriteFilePage(pFD);
if (code) goto _exit;
if (pgno < pFD->szFile) {
code = tsdbReadFilePage(pFD, pgno);
if (code) goto _exit;
} else {
pFD->pgno = pgno;
}
}
int64_t nRead = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
memcpy(pFD->pBuf + bOffset, pBuf + n, nRead);
pgno++;
bOffset = 0;
n += nRead;
} while (n < size);
_exit:
return code;
}
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
int32_t code = 0;
int64_t n;
......@@ -177,9 +203,18 @@ _exit:
return code;
}
static int32_t tsdbLSeekFile(STsdbFD *pFD, int64_t offset) {
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0;
ASSERT(0);
code = tsdbWriteFilePage(pFD);
if (code) goto _exit;
if (taosFsyncFile(pFD->pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
_exit:
return code;
}
......@@ -220,11 +255,8 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
ASSERT(n == TSDB_FHDR_SIZE);
pWriter->fHead.size += TSDB_FHDR_SIZE;
// data
......@@ -237,12 +269,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
if (code) goto _err;
if (pWriter->fData.size == 0) {
code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
pWriter->fData.size += TSDB_FHDR_SIZE;
} else {
// code = tsdbLSeekFile(pWriter->pDataFD, 0, SEEK_END);
// if (code) goto _err;
}
// sma
......@@ -255,22 +284,19 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
if (code) goto _err;
if (pWriter->fSma.size == 0) {
code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
pWriter->fSma.size += TSDB_FHDR_SIZE;
} else {
code = tsdbLSeekFile(pWriter->pSmaFD, 0);
if (code) goto _err;
}
// sst
ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pLastFD);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;
......@@ -291,31 +317,23 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
pTsdb = (*ppWriter)->pTsdb;
if (sync) {
if (tsdbFsyncFile((*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbFsyncFile((*ppWriter)->pHeadFD);
if (code) goto _err;
if (tsdbFsyncFile((*ppWriter)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbFsyncFile((*ppWriter)->pDataFD);
if (code) goto _err;
if (tsdbFsyncFile((*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbFsyncFile((*ppWriter)->pSmaFD);
if (code) goto _err;
if (tsdbFsyncFile((*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbFsyncFile((*ppWriter)->pSstFD);
if (code) goto _err;
}
tsdbCloseFile(&(*ppWriter)->pHeadFD);
tsdbCloseFile(&(*ppWriter)->pDataFD);
tsdbCloseFile(&(*ppWriter)->pSmaFD);
tsdbCloseFile(&(*ppWriter)->pLastFD);
tsdbCloseFile(&(*ppWriter)->pSstFD);
for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
tFree((*ppWriter)->aBuf[iBuf]);
......@@ -338,41 +356,25 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
// head ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutHeadFile(hdr, &pWriter->fHead);
code = tsdbLSeekFile(pWriter->pHeadFD, 0);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
// data ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutDataFile(hdr, &pWriter->fData);
code = tsdbLSeekFile(pWriter->pDataFD, 0);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
// sma ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutSmaFile(hdr, &pWriter->fSma);
code = tsdbLSeekFile(pWriter->pSmaFD, 0);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
// sst ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
code = tsdbLSeekFile(pWriter->pLastFD, 0);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL);
code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
return code;
......@@ -385,7 +387,7 @@ _err:
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
int64_t size = 0;
int64_t size;
int64_t n;
// check
......@@ -395,6 +397,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
}
// prepare
size = 0;
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
}
......@@ -411,7 +414,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
ASSERT(n == size);
// write
code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
if (code) goto _err;
// update
......@@ -442,10 +445,10 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
if (code) goto _err;
// build
n = tPutMapData(pWriter->aBuf[0] + n, mBlock);
n = tPutMapData(pWriter->aBuf[0], mBlock);
// write
code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL);
code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
if (code) goto _err;
// update
......@@ -467,7 +470,7 @@ _err:
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
int32_t code = 0;
SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
int64_t size = 0;
int64_t size;
int64_t n;
// check
......@@ -477,6 +480,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
}
// size
size = 0;
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
}
......@@ -492,7 +496,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
}
// write
code = tsdbWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size, NULL);
code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size);
if (code) goto _err;
// update
......@@ -500,7 +504,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
pSstFile->size += size;
_exit:
tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
pSstFile->offset, size);
return code;
......@@ -534,11 +538,11 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], pSmaInfo->size, NULL);
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
if (code) goto _err;
pSmaInfo->offset = pWriter->fSma.size;
// pWriter->fSma.size += size;
pWriter->fSma.size += pSmaInfo->size;
}
return code;
......@@ -554,7 +558,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
ASSERT(pBlockData->nRow > 0);
pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size;
if (toLast) {
pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size;
} else {
pBlkInfo->offset = pWriter->fData.size;
}
pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0;
......@@ -563,24 +571,28 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
if (code) goto _err;
// write =================
STsdbFD *pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD;
pBlkInfo->szKey = aBufN[3] + aBufN[2];
pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
code = tsdbWriteFile(pFD, pWriter->aBuf[3], aBufN[3], NULL);
int64_t offset = pBlkInfo->offset;
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[3], aBufN[3]);
if (code) goto _err;
offset += aBufN[3];
code = tsdbWriteFile(pFD, pWriter->aBuf[2], aBufN[2], NULL);
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[2], aBufN[2]);
if (code) goto _err;
offset += aBufN[2];
if (aBufN[1]) {
code = tsdbWriteFile(pFD, pWriter->aBuf[1], aBufN[1], NULL);
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[1], aBufN[1]);
if (code) goto _err;
offset += aBufN[1];
}
if (aBufN[0]) {
code = tsdbWriteFile(pFD, pWriter->aBuf[0], aBufN[0], NULL);
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], aBufN[0]);
if (code) goto _err;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册