提交 d31459dc 编写于 作者: H Hongze Cheng

more code

上级 ffc38e30
...@@ -78,7 +78,7 @@ typedef struct { ...@@ -78,7 +78,7 @@ typedef struct {
SRBTree rbt; SRBTree rbt;
SDataIter dataIter; SDataIter dataIter;
SDataIter aDataIter[TSDB_MAX_LAST_FILE]; SDataIter aDataIter[TSDB_MAX_LAST_FILE];
int8_t toLast; int8_t toLastOnly;
}; };
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
...@@ -403,30 +403,31 @@ _exit: ...@@ -403,30 +403,31 @@ _exit:
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
pCommitter->pIter = NULL; pCommitter->pIter = NULL;
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
// memory // memory
TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
SDataIter *pIter = &pCommitter->dataIter; SDataIter *pIter = &pCommitter->dataIter;
pIter->type = MEMORY_DATA_ITER; pIter->type = MEMORY_DATA_ITER;
pIter->iTbDataP = 0; pIter->iTbDataP = 0;
for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) { for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter); tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
continue; pRow = NULL;
} }
if (pRow == NULL) continue;
pIter->r.suid = pTbData->suid; pIter->r.suid = pTbData->suid;
pIter->r.uid = pTbData->uid; pIter->r.uid = pTbData->uid;
pIter->r.row = *pRow; pIter->r.row = *pRow;
break; break;
} }
ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
// disk // disk
...@@ -448,17 +449,22 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -448,17 +449,22 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData); code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid; pIter->r.suid = pIter->bData.suid;
pIter->r.uid = pIter->bData.uid; pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
iIter++; iIter++;
} }
pCommitter->toLast = 0; if (iIter > 0) {
pCommitter->toLastOnly = 1;
} else {
pCommitter->toLastOnly = 0;
}
} else { } else {
pCommitter->toLast = 1; pCommitter->toLastOnly = 0;
} }
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
...@@ -482,8 +488,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -482,8 +488,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->nextKey = TSKEY_MAX; pCommitter->nextKey = TSKEY_MAX;
// Reader // Reader
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid}, SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
tDFileSetCmprFn, TD_EQ); pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
if (pRSet) { if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
if (code) goto _err; if (code) goto _err;
...@@ -493,10 +499,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -493,10 +499,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
pCommitter->dReader.iBlockIdx = 0; pCommitter->dReader.iBlockIdx = 0;
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
pCommitter->dReader.pBlockIdx = pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _err; if (code) goto _err;
} else { } else {
...@@ -508,47 +512,32 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -508,47 +512,32 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
} }
// Writer // Writer
SHeadFile fHead; SHeadFile fHead = {.commitID = pCommitter->commitID};
SDataFile fData; SDataFile fData = {.commitID = pCommitter->commitID};
SSmaFile fSma; SSmaFile fSma = {.commitID = pCommitter->commitID};
SLastFile fLast; SLastFile fLast = {.commitID = pCommitter->commitID};
SDFileSet wSet = {0}; SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pRSet) { if (pRSet) {
ASSERT(pCommitter->maxLast == 1 || pRSet->nLastF < pCommitter->maxLast); ASSERT(pRSet->nLastF <= pCommitter->maxLast);
fHead = (SHeadFile){.commitID = pCommitter->commitID};
fData = *pRSet->pDataF; fData = *pRSet->pDataF;
fSma = *pRSet->pSmaF; fSma = *pRSet->pSmaF;
fLast = (SLastFile){.commitID = pCommitter->commitID};
wSet.diskId = pRSet->diskId; wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid; if (pRSet->nLastF < pCommitter->maxLast) {
wSet.pHeadF = &fHead; for (int32_t iLast = 0; iLast < pRSet->nLastF; iLast++) {
wSet.pDataF = &fData; wSet.aLastF[iLast] = pRSet->aLastF[iLast];
wSet.pSmaF = &fSma; }
for (int8_t iLast = 0; iLast < pRSet->nLastF; iLast++) { wSet.nLastF = pRSet->nLastF + 1;
wSet.aLastF[iLast] = pRSet->aLastF[iLast]; } else {
wSet.nLastF = 1;
} }
wSet.nLastF = pRSet->nLastF + 1;
wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo
} else { } else {
fHead = (SHeadFile){.commitID = pCommitter->commitID};
fData = (SDataFile){.commitID = pCommitter->commitID};
fSma = (SSmaFile){.commitID = pCommitter->commitID};
fLast = (SLastFile){.commitID = pCommitter->commitID};
SDiskID did = {0}; SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
wSet.diskId = did; wSet.diskId = did;
wSet.fid = pCommitter->commitFid;
wSet.pHeadF = &fHead;
wSet.pDataF = &fData;
wSet.pSmaF = &fSma;
wSet.nLastF = 1; wSet.nLastF = 1;
wSet.aLastF[0] = &fLast;
} }
wSet.aLastF[wSet.nLastF - 1] = &fLast;
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err; if (code) goto _err;
...@@ -1722,6 +1711,9 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1722,6 +1711,9 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
pBlock = NULL; pBlock = NULL;
} }
} }
code = tsdbCommitterNextTableData(pCommitter);
if (code) goto _err;
} }
_exit: _exit:
...@@ -1791,34 +1783,20 @@ _err: ...@@ -1791,34 +1783,20 @@ _err:
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SRowInfo *pRowInfo = NULL; SRowInfo *pRowInfo;
TABLEID id = {0}; TABLEID id = {0};
while (true) { while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo == NULL) {
/* end remain table data commit*/
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX});
if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
break;
}
ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid); ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
/* start new table data commit */
id.suid = pRowInfo->suid; id.suid = pRowInfo->suid;
id.uid = pRowInfo->uid; id.uid = pRowInfo->uid;
// reader
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
// writer
// start
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
// other
// impl
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
...@@ -1834,6 +1812,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1834,6 +1812,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbCommitTableData(pCommitter, id); code = tsdbCommitTableData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
// end
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
...@@ -1846,6 +1825,17 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1846,6 +1825,17 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
} }
} }
id.suid = INT64_MAX;
id.uid = INT64_MAX;
code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err;
// TODO: here may have problem
if (pCommitter->dWriter.bDatal.nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
return code; return code;
_err: _err:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册