提交 ec2a0691 编写于 作者: H Hongze Cheng

more vnode snapshot writer

上级 5b5b6a03
......@@ -377,10 +377,9 @@ struct STsdbSnapWriter {
SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlock* pBlock;
SBlock block;
SBlockData bDataR;
SBlockData* pBlockData;
int32_t iRow;
SBlockData bDataR;
SDataFWriter* pDataFWriter;
SBlockIdx* pBlockIdxW; // NULL when no committing table
......@@ -550,127 +549,171 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
if (pWriter->pDataFReader == NULL) {
// no old data
// TABLE ====================================
// end last table write if need
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
// end last table write if should
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
// end
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
if (c < 0) {
// end last table data write
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
}
}
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
// start new table data write if need
if (pWriter->pBlockIdxW == NULL) {
// write table data ahead
code = tsdbSnapWriteTableDataAhead(pWriter, id);
if (code) goto _err;
// reader
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
ASSERT(pWriter->pDataFReader);
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
if (c) {
ASSERT(c > 0);
pWriter->pBlockIdx = NULL;
} else {
pWriter->iBlockIdx++;
}
} else {
pWriter->pBlockIdx = NULL;
}
// start a new table data if need
if (pWriter->pBlockIdxW == NULL) {
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock);
}
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
tBlockReset(&pWriter->blockW);
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
tBlockDataReset(&pWriter->bDataW);
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
tMapDataReset(&pWriter->mBlockW);
}
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));
// set block schema
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
if (code) goto _err;
// BLOCK ====================================
int32_t iRow = 0;
TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow);
while (true) {
if (pRow == NULL) break;
// add rows
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
code = tBlockDataAppendRow(&pWriter->bDataW, &row, NULL);
if (code) goto _err;
int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow));
ASSERT(c);
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (c < 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
if (code) goto _err;
// reset
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
pWriter->iRow++;
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
pWriter->pBlockData = NULL;
}
}
}
} else {
// has old data
} else {
SBlock block;
// TABLE ==================================================
// end last table data if id not same (todo)
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
} else if (c > 0) {
ASSERT(0);
}
}
if (pWriter->iBlock < pWriter->mBlock.nItem) {
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
int32_t c;
// start new table data if need (todo)
if (pWriter->pBlockIdxW == NULL) {
// commit table data ahead
code = tsdbSnapWriteTableDataAhead(pWriter, id);
if (code) goto _err;
c = tsdbKeyCmprFn(&block.maxKey, &TSDBROW_KEY(pRow));
ASSERT(c);
// reader
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
if (c) {
pWriter->pBlockIdx = NULL;
if (c < 0) {
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
pWriter->iBlock++;
continue;
}
} else {
pWriter->pBlockIdx = NULL;
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlockW, NULL);
c = tsdbKeyCmprFn(&block.minKey, &TSDBROW_KEY(pRow));
ASSERT(c);
if (c > 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
goto _check_write;
}
pWriter->pBlockData = &pWriter->bDataR;
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iBlock = 0;
pWriter->iRow = 0;
}
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
pRow = &tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
}
// BLOCK ==================================================
// write block ahead
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
_check_write:
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (tsdbKeyCmprFn(&block.maxKey, &keyFirst) >= 0) break;
_write_block:
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW,
pWriter->cmprAlg);
if (code) goto _err;
pWriter->iBlock++;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
}
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
_exit:
return code;
_err:
......@@ -720,7 +763,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
pWriter->pBlock = NULL;
tBlockDataReset(&pWriter->bDataR);
pWriter->iRow = 0;
......
......@@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
// alloc
code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
if (code) goto _err;
if (code) goto _exit;
code = tRealloc(&pMapData->pData, pMapData->nData);
if (code) goto _err;
if (code) goto _exit;
// put
pMapData->aOffset[nItem] = offset;
tPutItemFn(pMapData->pData + offset, pItem);
_err:
_exit:
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册