提交 7e458785 编写于 作者: H Hongze Cheng

more vnode snapshot

上级 74b31e8c
......@@ -488,52 +488,42 @@ _err:
return code;
}
static int32_t tsdbSnapWriteTableDataAhead(STsdbSnapWriter* pWriter, TABLEID id) {
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
int32_t code = 0;
if (pWriter->pDataFReader == NULL) goto _exit;
while (true) {
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
if (c >= 0) break;
pWriter->iBlockIdx++;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
SBlock block;
tMapDataReset(&pWriter->mBlockW);
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
// SBlockData
SBlock block;
tMapDataReset(&pWriter->mBlockW);
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
tBlockReset(&block);
block.last = 1;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
pWriter->cmprAlg);
if (code) goto _err;
}
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
tBlockReset(&block);
block.last = 1;
code =
tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg);
if (code) goto _err;
}
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
}
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// SBlock
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
if (code) goto _err;
// SBlockIdx
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
......@@ -543,77 +533,19 @@ _err:
return code;
}
static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// TABLE ====================================
// end last table write if should
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
// end
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
int32_t iRow = 0;
TSDBROW row;
TSDBROW* pRow = &row;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
}
}
// start new table data write if need
if (pWriter->pBlockIdxW == NULL) {
// write table data ahead
code = tsdbSnapWriteTableDataAhead(pWriter, id);
if (code) goto _err;
// reader
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
ASSERT(pWriter->pDataFReader);
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
if (c) {
ASSERT(c > 0);
pWriter->pBlockIdx = NULL;
} else {
pWriter->iBlockIdx++;
}
} else {
pWriter->pBlockIdx = NULL;
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock);
}
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));
// correct schema
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
if (code) goto _err;
// BLOCK ====================================
int32_t iRow = 0;
TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow);
// loop to merge
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
while (true) {
if (pRow == NULL) break;
......@@ -630,7 +562,7 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
......@@ -644,55 +576,60 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
}
}
} else {
SBlock block;
TSDBKEY key = TSDBROW_KEY(pRow);
if (pWriter->iBlock < pWriter->mBlock.nItem) {
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
int32_t c;
c = tsdbKeyCmprFn(&block.maxKey, &TSDBROW_KEY(pRow));
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
ASSERT(c);
if (block.last) {
pWriter->pBlockData = &pWriter->bDataR;
if (c < 0) {
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
continue;
break;
}
c = tsdbKeyCmprFn(&block.minKey, &TSDBROW_KEY(pRow));
c = tsdbKeyCmprFn(&block.maxKey, &key);
ASSERT(c);
if (c > 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (c > 0) break;
if (pWriter->bDataW.nRow) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
goto _check_write;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
pWriter->pBlockData = &pWriter->bDataR;
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
}
if (pWriter->pBlockData) continue;
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
if (iRow < pBlockData->nRow) {
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
......@@ -713,6 +650,91 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
tBlockDataClearData(&pWriter->bDataW);
}
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// end last table write if should
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
// end
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
}
}
// start new table data write if need
if (pWriter->pBlockIdxW == NULL) {
// write table data ahead
while (true) {
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
if (c >= 0) break;
code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx);
if (code) goto _err;
pWriter->iBlockIdx++;
}
// reader
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
ASSERT(pWriter->pDataFReader);
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
ASSERT(c >= 0);
if (c == 0) {
pWriter->pBlockIdx = pBlockIdx;
pWriter->iBlockIdx++;
}
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock);
}
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));
code = tsdbSnapWriteTableDataImpl(pWriter);
if (code) goto _err;
_exit:
return code;
......@@ -723,11 +745,10 @@ _err:
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(&pHdr[1]);
int64_t n;
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
// decode
SBlockData* pBlockData = &pWriter->bData;
......@@ -742,13 +763,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
// end last file data write if need
code = tsdbSnapWriteDataEnd(pWriter); // todo
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
pWriter->fid = fid;
// read
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); // todo: check nState is valid
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
if (pSet) {
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
......@@ -763,8 +784,9 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
tBlockDataReset(&pWriter->bDataR);
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
tBlockDataReset(&pWriter->bDataR);
// write
SDFileSet wSet;
......@@ -789,12 +811,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxW);
pWriter->pBlockIdxW = NULL;
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW);
}
code = tsdbSnapWriteDataImpl(pWriter, id);
code = tsdbSnapWriteTableData(pWriter, id);
if (code) goto _err;
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册