提交 2d0882ed 编写于 作者: H Hongze Cheng

more last file refact

上级 c2db42f1
......@@ -20,6 +20,12 @@ typedef struct {
STSchema *pTSchema;
} SSkmInfo;
typedef struct {
int64_t suid;
int64_t uid;
TSDBROW row;
} SRowInfo;
typedef struct {
STsdb *pTsdb;
/* commit data */
......@@ -38,11 +44,20 @@ typedef struct {
// commit file data
struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aBlockL; // SArray<SBlockL>
SMapData mBlock; // SMapData<SBlock>, read from reader
SBlockData bData;
SBlockData bDatal;
// data
SArray *aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx *pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
SBlockData bData;
// last
SArray *aBlockL; // SArray<SBlockL>
int32_t iBlockL;
SBlockL *pBlockL;
SBlockData bDatal;
int32_t iRow;
SRowInfo *pRow;
SRowInfo row;
} dReader;
struct {
SDataFWriter *pWriter;
......@@ -290,20 +305,46 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
if (code) goto _err;
// data
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
if (code) goto _err;
pCommitter->dReader.iBlockIdx = 0;
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code =
tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
// last
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
if (code) goto _err;
pCommitter->dReader.iBlockL = 0;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
// TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL,
// NULL);
if (code) goto _err;
pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRow = &pCommitter->dReader.row;
pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid;
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.pRow = NULL;
}
} else {
pCommitter->dReader.pReader = NULL;
taosArrayClear(pCommitter->dReader.aBlockIdx);
taosArrayClear(pCommitter->dReader.aBlockL);
pCommitter->dReader.pBlockIdx = NULL;
pCommitter->dReader.pRow = NULL;
}
tMapDataReset(&pCommitter->dReader.mBlock);
tBlockDataReset(&pCommitter->dReader.bData);
tBlockDataReset(&pCommitter->dReader.bDatal);
// Writer
SHeadFile fHead;
......@@ -707,7 +748,7 @@ _err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
......@@ -716,6 +757,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
int32_t nBlock;
int64_t suid;
int64_t uid;
SBlockIdx *pBlockIdx = NULL;
if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
......@@ -908,6 +950,95 @@ _err:
return code;
}
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
int32_t code = 0;
// data
while (true) {
if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break;
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, NULL, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCommitter->dReader.iBlockIdx++;
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code =
tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
}
// last
SBlockL blockL;
while (true) {
if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break;
// check if same suid
if (0) {
goto _write_block_data;
}
// append
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRow->row, NULL);
if (code) goto _err;
// next
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
// TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL,
// NULL);
if (code) goto _err;
pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid;
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.pRow = NULL;
}
}
// write
if (pCommitter->dWriter.bDatal.nRow < pCommitter->maxRow) continue;
_write_block_data:
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL,
pCommitter->cmprAlg); // todo
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tBlockDataClearData(&pCommitter->dWriter.bDatal);
}
return code;
_err:
tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
......@@ -918,59 +1049,32 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
// commit file data impl
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iBlockIdx = 0;
int32_t nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx);
STbData *pTbData;
SBlockIdx *pBlockIdx;
ASSERT(nTbData > 0);
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
while (pTbData || pBlockIdx) {
if (pTbData && pBlockIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pMemTable->aTbData); iTbData++) {
STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
if (c == 0) {
goto _commit_table_mem_and_disk;
} else if (c < 0) {
goto _commit_table_mem_data;
} else {
goto _commit_table_disk_data;
}
} else if (pBlockIdx) {
goto _commit_table_disk_data;
} else {
goto _commit_table_mem_data;
}
_commit_table_mem_data:
code = tsdbCommitTableData(pCommitter, pTbData, NULL);
// move commit until current (suid, uid)
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid});
if (code) goto _err;
iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
_commit_table_disk_data:
code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
// commit current table data commit
code = tsdbCommitTableData(pCommitter, pTbData);
if (code) goto _err;
}
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
continue;
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX});
if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) {
SBlockL blockL;
_commit_table_mem_and_disk:
code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, NULL, NULL, NULL, NULL,
pCommitter->cmprAlg);
if (code) goto _err;
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
// commit file data end
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册