提交 0359254b 编写于 作者: H Hongze Cheng

more work

上级 607f698f
......@@ -101,6 +101,7 @@ int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
// SBlockIdx
#define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
// SColdata
......
......@@ -463,7 +463,8 @@ _err:
return code;
}
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
int8_t isLastBlock) {
int32_t code = 0;
TSDBROW *pRow;
SBlock block = tBlockInit();
......@@ -523,12 +524,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
STbDataIter iter;
STbDataIter *pIter = &iter;
TSDBROW *pRow;
SBlockIdx blockIdx; // TODO
int64_t suid;
int64_t uid;
SBlockIdx blockIdx;
// create iter
if (pTbData) {
suid = pTbData->suid;
uid = pTbData->uid;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
} else {
suid = pBlockIdx->suid;
uid = pBlockIdx->uid;
pIter = NULL;
}
......@@ -538,23 +545,27 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// start ================================
tMapDataReset(&pCommitter->oBlockMap);
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->oBlock);
tBlockDataReset(&pCommitter->oBlockData);
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
if (code) goto _err;
}
blockIdx = tBlockIdxInit(suid, uid);
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->nBlock);
tBlockDataReset(&pCommitter->nBlockData);
// impl ===============================
SBlock block;
SBlock *pBlock = █
int32_t iBlock = 0;
int32_t nBlockMap = pCommitter->oBlockMap.nItem;
int32_t nBlock = pCommitter->oBlockMap.nItem;
// merge
pRow = tsdbTbDataIterGet(pIter);
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlockMap) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock);
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1));
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
......@@ -564,17 +575,17 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// mem
pRow = tsdbTbDataIterGet(pIter);
while (!ROW_END(pRow, pCommitter->maxKey)) {
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL);
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
}
// disk
while (iBlock < nBlockMap) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
while (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0);
if (code) goto _err;
iBlock++;
......
......@@ -489,17 +489,17 @@ _err:
return code;
}
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
int64_t n;
uint32_t delimiter;
tb_uid_t suid;
tb_uid_t uid;
int64_t suid;
int64_t uid;
// alloc
if (!ppBuf) ppBuf = &mBlockIdx->pBuf;
if (!ppBuf) ppBuf = &mBlock->pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
......@@ -533,7 +533,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
ASSERT(suid == pBlockIdx->suid);
n += tGetI64(*ppBuf + n, &uid);
ASSERT(uid == pBlockIdx->uid);
n += tGetMapData(*ppBuf + n, mBlockIdx);
n += tGetMapData(*ppBuf + n, mBlock);
ASSERT(n + sizeof(TSCKSUM) == size);
return code;
......@@ -625,20 +625,112 @@ _err:
}
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
int32_t code = 0;
int64_t size = TSDB_FHDR_SIZE;
int64_t n;
uint8_t *pBuf = NULL;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
SDataFile *pDataFile = pWriter->pSet->pDataFile;
SLastFile *pLastFile = pWriter->pSet->pLastFile;
SSmaFile *pSmaFile = pWriter->pSet->pSmaFile;
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
// head ==============
// build
memset(*ppBuf, 0, size);
// tPutHeadFileHdr(*ppBuf, pHeadFile);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// data ==============
memset(*ppBuf, 0, size);
// tPutDataFileHdr(*ppBuf, pDataFile);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pDataFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// last ==============
memset(*ppBuf, 0, size);
// tPutLastFileHdr(*ppBuf, pLastFile);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pLastFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// sma ==============
memset(*ppBuf, 0, size);
// tPutSmaFileHdr(*ppBuf, pSmaFile);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pSmaFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
tsdbFree(pBuf);
return code;
_err:
tsdbFree(pBuf);
tsdbError("vgId:%d update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = 0;
int64_t n = 0;
uint8_t *pBuf = NULL;
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = 0;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
int64_t n = 0;
uint8_t *pBuf = NULL;
// prepare
size += tPutU32(NULL, TSDB_FILE_DLMT);
size = size + tPutMapData(NULL, pBlockIdxMap) + sizeof(TSCKSUM);
size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
......@@ -647,7 +739,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t
// build
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutMapData(*ppBuf, pBlockIdxMap);
n += tPutMapData(*ppBuf, mBlockIdx);
taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size);
......@@ -659,7 +751,9 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t
goto _err;
}
// update (todo)
// update
pHeadFile->offset = pHeadFile->size;
pHeadFile->size += size;
tsdbFree(pBuf);
return code;
......@@ -670,20 +764,21 @@ _err:
return code;
}
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
ASSERT(pBlockMap->nItem > 0);
ASSERT(mBlock->nItem > 0);
// prepare
size = 0;
size += tPutU32(NULL, TSDB_FILE_DLMT);
size += tPutI64(NULL, pBlockIdx->suid);
size += tPutI64(NULL, pBlockIdx->uid);
size = size + tPutMapData(NULL, pBlockMap) + sizeof(TSCKSUM);
size = size + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
......@@ -695,7 +790,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppB
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutI64(*ppBuf + n, pBlockIdx->suid);
n += tPutI64(*ppBuf + n, pBlockIdx->uid);
n += tPutMapData(*ppBuf + n, pBlockMap);
n += tPutMapData(*ppBuf + n, mBlock);
taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size);
......@@ -707,16 +802,18 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppB
goto _err;
}
// update (todo)
// pBlockIdx->offset = -1;
// update
pBlockIdx->offset = pHeadFile->size;
pBlockIdx->size = size;
// pWriter->pSet->pHeadF.offset
pHeadFile->size += size;
tsdbFree(pBuf);
tsdbTrace("vgId:%d write block, offset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pBlockIdx->offset,
pBlockIdx->size);
return code;
_err:
tsdbFree(pBuf);
tsdbError("vgId:%d write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -298,14 +298,40 @@ void tBlockClear(SBlock *pBlock) {
int32_t tPutBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
// TODO
n += tPutKEYINFO(p ? p + n : p, &pBlock->info);
n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->last);
n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->cmprAlg);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize);
n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol);
}
return n;
}
int32_t tGetBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
// TODO
n += tGetKEYINFO(p + n, &pBlock->info);
n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->last);
n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->cmprAlg);
n += tGetI8(p + n, &pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize);
n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol);
}
return n;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册