提交 25a2d04b 编写于 作者: H Hongze Cheng

more vnode snapshot writer

上级 c5133de7
......@@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -135,6 +135,7 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
......
......@@ -383,11 +383,10 @@ struct STsdbSnapWriter {
int32_t iRow;
SDataFWriter* pDataFWriter;
SBlockIdx* pBlockIdxW;
SBlockIdx blockIdx;
SBlock* pBlockW;
SBlockIdx* pBlockIdxW; // NULL when no committing table
SBlock blockW;
SBlockData bDataW;
SBlockIdx blockIdxW;
SMapData mBlockW; // SMapData<SBlock>
SArray* aBlockIdxW; // SArray<SBlockIdx>
......@@ -455,110 +454,40 @@ _err:
return code;
}
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) {
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
#if 0
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
TABLEID id = {0}; // TODO
ASSERT(pWriter->pBlockIdxW != NULL);
// skip
while (pWriter->pBlockIdx && tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
pWriter->iBlockIdx++;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
// write remain data if has
if (pWriter->bDataW.nRow > 0) {
if (pWriter->bDataW.nRow >= pWriter->minRow) {
pWriter->blockW.last = 0;
} else {
pWriter->pBlockIdx = NULL;
}
}
// new or merge
if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
int32_t c;
if (pWriter->pBlockIdxW && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxW)) != 0)) {
ASSERT(c > 0);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
}
if (pWriter->pBlockIdxW == NULL) {
pWriter->pBlockIdx = &pWriter->blockIdx;
pWriter->pBlockIdx->suid = id.suid;
pWriter->pBlockIdx->uid = id.uid;
}
// loop to write the data
TSDBROW* pRow = NULL; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
if (pWriter->bDataW.nRow > pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
pWriter->pBlockW, pWriter->cmprAlg);
if (code) goto _err;
}
}
} else {
// skip
while (true) {
if (pWriter->pBlock == NULL) break;
if (pWriter->pBlock->last) break;
if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break;
code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlock, tPutBlock);
if (code) goto _err;
pWriter->blockW.last = 1;
}
if (pWriter->pBlock) {
if (pWriter->pBlock->last) {
// load the last block and merge with the data (todo)
} else {
int32_t c = tBlockCmprFn(&(SBlock){0 /*TODO*/}, pWriter->pBlock);
if (c > 0) {
// commit until pWriter->pBlock (todo)
} else {
// load the block and merge with the data (todo)
}
}
} else {
int32_t nRow = 0;
SBlockData* pBlockData = NULL;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
}
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
pWriter->pBlockW, pWriter->cmprAlg);
if (code) goto _err;
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
if (code) goto _err;
tBlockDataClearData(&pWriter->bDataW);
}
}
}
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
#endif
static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
......@@ -567,11 +496,16 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
if (pWriter->pDataFReader == NULL) {
// no old data
// end last table data commit if id not same
// end last table write if need
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id);
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
// commit last table data and reset (todo)
// end last table data write
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
......@@ -580,41 +514,40 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) {
// start a new table data if need
if (pWriter->pBlockIdxW == NULL) {
pWriter->pBlockIdxW = &pWriter->blockIdx;
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
pWriter->pBlockW = &pWriter->blockW;
tBlockReset(pWriter->pBlockW);
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
// set block schema (todo)
// set block schema
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
if (code) goto _err;
// add rows
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow);
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
code = tBlockDataAppendRow(&pWriter->bDataW, &row, NULL);
if (code) goto _err;
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
// write the block to file
pWriter->pBlockW->last = 0;
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
pWriter->pBlockW, pWriter->cmprAlg);
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlockW, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
// reset
tBlockReset(pWriter->pBlockW);
tBlockDataReset(&pWriter->bDataW);
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
}
} else {
......@@ -647,6 +580,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
// end last file data write if need
code = tsdbSnapWriteDataEnd(pWriter); // todo
if (code) goto _err;
......@@ -697,7 +631,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
taosArrayClear(pWriter->aBlockIdxW);
pWriter->pBlockIdxW = NULL;
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockW = NULL;
tBlockDataReset(&pWriter->bDataW);
}
......@@ -709,7 +642,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -1132,6 +1132,46 @@ _err:
return code;
}
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < taosArrayGetSize(pBlockData->aIdx)) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColData->type, pColData->smaOn);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
}
iColData++;
break;
} else if (pColData->cid == pColDataFrom->cid) {
iColData++;
break;
} else {
iColData++;
}
}
}
_exit:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册