提交 989159f0 编写于 作者: H Hongze Cheng

vnod snapshot done

上级 c9b0864e
...@@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo { ...@@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
...@@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, NULL, NULL); pBlockData, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead ...@@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
while (1) { while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
...@@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { ...@@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
terrno = code; terrno = code;
return NULL; return NULL;
} }
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData, 1);
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
...@@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
hasNext = (pBlockIter->numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr); // pReader->pFileGroup->fid, pReader->idStr);
} }
return code; return code;
...@@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -411,30 +411,6 @@ static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) { ...@@ -411,30 +411,6 @@ static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
return code; return code;
} }
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit;
// TODO
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
int32_t iRow = 0; // todo int32_t iRow = 0; // todo
...@@ -456,14 +432,41 @@ _err: ...@@ -456,14 +432,41 @@ _err:
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
ASSERT(pWriter->pBlockIdxW != NULL); ASSERT(pWriter->pDataFWriter);
if (pWriter->pBlockIdxW == NULL) goto _exit;
// consume remain rows
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
while (pWriter->iRow < pWriter->pBlockData->nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
if (code) goto _err;
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
pWriter->iRow++;
}
}
// write remain data if has // write remain data if has
if (pWriter->bDataW.nRow > 0) { if (pWriter->bDataW.nRow > 0) {
if (pWriter->bDataW.nRow >= pWriter->minRow) { pWriter->blockW.last = 0;
pWriter->blockW.last = 0; if (pWriter->bDataW.nRow < pWriter->minRow) {
} else { if (pWriter->iBlock > pWriter->mBlock.nItem) {
pWriter->blockW.last = 1; pWriter->blockW.last = 1;
}
} }
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
...@@ -474,14 +477,40 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { ...@@ -474,14 +477,40 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
if (code) goto _err; if (code) goto _err;
} }
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
pWriter->cmprAlg);
if (code) goto _err;
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
pWriter->iBlock++;
}
// SBlock
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
if (code) goto _err; if (code) goto _err;
// SBlockIdx
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
_exit:
return code; return code;
_err: _err:
...@@ -601,25 +630,39 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { ...@@ -601,25 +630,39 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
ASSERT(c); ASSERT(c);
if (c > 0) break; if (c < 0) {
if (pWriter->bDataW.nRow) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
if (pWriter->bDataW.nRow) { code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
pWriter->blockW.last = 0; if (code) goto _err;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg); tBlockReset(&pWriter->blockW);
if (code) goto _err; tBlockDataClearData(&pWriter->bDataW);
}
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err; if (code) goto _err;
tBlockReset(&pWriter->blockW); pWriter->iBlock++;
tBlockDataClearData(&pWriter->bDataW); } else {
} c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); ASSERT(c);
if (code) goto _err;
if (c < 0) break;
pWriter->pBlockData = &pWriter->bDataR;
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++; pWriter->iBlock++;
break;
}
} }
if (pWriter->pBlockData) continue; if (pWriter->pBlockData) continue;
...@@ -744,6 +787,45 @@ _err: ...@@ -744,6 +787,45 @@ _err:
return code; return code;
} }
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit;
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
if (code) goto _err;
pWriter->iBlockIdx++;
}
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
if (code) goto _err;
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
if (code) goto _err;
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err;
}
_exit:
tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
return code;
_err:
tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
...@@ -845,23 +927,23 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -845,23 +927,23 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
} }
// writer // writer
SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0}; SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
if (code) goto _err; if (code) goto _err;
} }
// process the del data // process the del data
TABLEID id = {0}; // todo TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
while (true) { while (true) {
SDelIdx* pDelIdx = NULL; SDelIdx* pDelIdx = NULL;
int64_t n = 0; int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
SDelData delData; SDelData delData;
SDelIdx delIdx; SDelIdx delIdx;
int8_t toBreak = 0; int8_t toBreak = 0;
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) { if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
pDelIdx = taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
} }
if (pDelIdx) { if (pDelIdx) {
...@@ -914,7 +996,7 @@ _exit: ...@@ -914,7 +996,7 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -923,6 +1005,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -923,6 +1005,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit; if (pWriter->pDelFWriter == NULL) goto _exit;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) { for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
...@@ -954,10 +1037,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -954,10 +1037,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode));
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -1076,7 +1160,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1076,7 +1160,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
// del data // del data
if (pHdr->type == 2) { if (pHdr->type == 2) {
code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); code = tsdbSnapWriteDel(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册