提交 f24317fb 编写于 作者: H Hongze Cheng

more last file refact

上级 2d0882ed
...@@ -570,6 +570,7 @@ _err: ...@@ -570,6 +570,7 @@ _err:
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0; int32_t code = 0;
#if 0
TSDBROW *pRow; TSDBROW *pRow;
SBlock block; SBlock block;
SBlock *pBlock = █ SBlock *pBlock = █
...@@ -623,6 +624,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -623,6 +624,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tBlockDataClearData(pBlockData); tBlockDataClearData(pBlockData);
} }
#endif
return code; return code;
_err: _err:
...@@ -653,25 +655,25 @@ _err: ...@@ -653,25 +655,25 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { // static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
int32_t code = 0; // int32_t code = 0;
SBlockIdx blockIdx = {.suid = suid, .uid = uid}; // SBlockIdx blockIdx = {.suid = suid, .uid = uid};
SBlockIdx *pBlockIdx = &blockIdx; // SBlockIdx *pBlockIdx = &blockIdx;
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx); // code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx);
if (code) goto _err; // if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) { // if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; // code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // goto _err;
} // }
return code; // return code;
_err: // _err:
tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); // tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; // return code;
} // }
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
int32_t nRow = 0; int32_t nRow = 0;
...@@ -748,6 +750,145 @@ _err: ...@@ -748,6 +750,145 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
ASSERT(pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, pTbData) >= 0);
// end last if need
if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) {
if (pCommitter->dWriter.bDatal.nRow > 0) {
// TODO: code = tsdbCommitBlockDataL(pCommitter);
if (code) goto _err;
}
}
// merge commit table data
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
TSDBROW *pRow;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRow == NULL) goto _exit;
SBlockIdx *pBlockIdx = NULL;
int32_t iBlock = 0;
SBlock block;
SBlock *pBlock = █
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
pBlockIdx = pCommitter->dReader.pBlockIdx;
}
if (pBlockIdx && iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
tMapDataReset(&pCommitter->dWriter.mBlock);
while (pBlock && pRow) {
int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)});
if (c < 0) { // disk
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
// next
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else if (c < 0) { // memory
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
// next
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
} else { // merge
int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
ASSERT(nOvlp > 0);
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
} else {
// code = tsdbMergeTableData(pCommitter, pIter, pBlock, NULL, 1);
if (code) goto _err;
}
// next
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
}
while (pBlock) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
if (pRow) {
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey);
}
// end
if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
_exit:
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
}
return code;
_err:
tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
#if 0
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0; int32_t code = 0;
STbDataIter iter = {0}; STbDataIter iter = {0};
...@@ -913,6 +1054,7 @@ _err: ...@@ -913,6 +1054,7 @@ _err:
tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
#endif
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
...@@ -985,8 +1127,14 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -985,8 +1127,14 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break; if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break;
// check if same suid // check if same suid
if (0) { if (pCommitter->dReader.pRow->suid == 0) {
goto _write_block_data; if (pCommitter->dReader.pRow->uid != 0 /*todo*/) {
// code = tsdbCommitBlockDataL(pCommitter);
if (code) goto _err;
}
} else if (pCommitter->dReader.pRow->suid != 0 /*todo*/) {
// code = tsdbCommitBlockDataL(pCommitter);
if (code) goto _err;
} }
// append // append
...@@ -1017,19 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -1017,19 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
} }
// write // write
if (pCommitter->dWriter.bDatal.nRow < pCommitter->maxRow) continue; if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
// code = tsdbCommitBlockDataL(pCommitter);
_write_block_data:
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL,
pCommitter->cmprAlg); // todo
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
} }
tBlockDataClearData(&pCommitter->dWriter.bDatal);
} }
return code; return code;
...@@ -1056,7 +1195,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { ...@@ -1056,7 +1195,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid}); code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid});
if (code) goto _err; if (code) goto _err;
// commit current table data commit // commit current table data
code = tsdbCommitTableData(pCommitter, pTbData); code = tsdbCommitTableData(pCommitter, pTbData);
if (code) goto _err; if (code) goto _err;
} }
...@@ -1065,16 +1204,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { ...@@ -1065,16 +1204,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) { if (pCommitter->dWriter.bDatal.nRow > 0) {
SBlockL blockL; // code = tsdbCommitBlockDataL(pCommitter);
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL,
pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
} }
// commit file data end // commit file data end
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册