提交 40dd9564 编写于 作者: H Hongze Cheng

more work

上级 0dbdc3d6
...@@ -138,7 +138,6 @@ void tColDataClear(void *ph); ...@@ -138,7 +138,6 @@ void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataPCmprFn(const void *p1, const void *p2);
// SBlockData // SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
......
...@@ -415,10 +415,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB ...@@ -415,10 +415,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
TSDBROW *pRow1; TSDBROW *pRow1;
TSDBROW row2; TSDBROW row2;
TSDBROW *pRow2 = &row2; TSDBROW *pRow2 = &row2;
TSDBROW row;
TSDBROW *pRow = &row;
int32_t c = 0;
TSKEY lastKey;
// read SBlockData // read SBlockData
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
...@@ -432,7 +428,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB ...@@ -432,7 +428,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err; if (code) goto _err;
lastKey = TSKEY_MIN;
tBlockReset(pBlock); tBlockReset(pBlock);
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
while (true) { while (true) {
...@@ -445,68 +440,56 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB ...@@ -445,68 +440,56 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
} }
if (pRow1 && pRow2) { if (pRow1 && pRow2) {
if (tsdbRowCmprFn(pRow1, pRow2) < 0) { int32_t c = tsdbRowCmprFn(pRow1, pRow2);
*pRow = *pRow1; if (c < 0) {
goto _append_mem_row;
tsdbTbDataIterNext(pIter); } else if (c > 0) {
pRow1 = tsdbTbDataIterGet(pIter); goto _append_block_row;
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
if (code) goto _err;
} else {
pRow1 = NULL;
}
}
} else if (tsdbRowCmprFn(pRow1, pRow2) > 0) {
*pRow = *pRow2;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
} else {
pRow2 = NULL;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
} else if (pRow1) { } else if (pRow1) {
*pRow = *pRow1; goto _append_mem_row;
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
if (code) goto _err;
} else {
pRow1 = NULL;
}
}
} else { } else {
*pRow = *pRow2; goto _append_block_row;
}
_append_mem_row:
code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema);
if (code) goto _err;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { tsdbTbDataIterNext(pIter);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); pRow1 = tsdbTbDataIterGet(pIter);
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
} else { } else {
pRow2 = NULL; pRow1 = NULL;
} }
} }
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block;
} else {
continue;
}
_append_block_row:
code = tBlockDataAppendRow(pBlockData, pRow1, NULL);
if (code) goto _err; if (code) goto _err;
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
pBlock->nRow++;
if (lastKey == TSDBROW_TS(pRow)) {
pBlock->hasDup = 1;
} else { } else {
lastKey = TSDBROW_TS(pRow); pRow2 = NULL;
} }
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
continue; goto _write_block;
} else {
continue;
}
_write_block: _write_block:
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
...@@ -521,7 +504,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB ...@@ -521,7 +504,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
lastKey = TSKEY_MIN;
tBlockReset(pBlock); tBlockReset(pBlock);
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
} }
...@@ -538,7 +520,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -538,7 +520,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
TSDBROW *pRow; TSDBROW *pRow;
SBlock *pBlock = &pCommitter->nBlock; SBlock *pBlock = &pCommitter->nBlock;
SBlockData *pBlockData = &pCommitter->nBlockData; SBlockData *pBlockData = &pCommitter->nBlockData;
TSKEY lastKey = TSKEY_MIN;
int64_t suid = pIter->pTbData->suid; int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid; int64_t uid = pIter->pTbData->uid;
...@@ -563,16 +544,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -563,16 +544,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
if (code) goto _err; if (code) goto _err;
// update
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow));
pBlock->nRow++;
if (TSDBROW_TS(pRow) == lastKey) {
pBlock->hasDup = 1;
} else {
lastKey = TSDBROW_TS(pRow);
}
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL; if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
...@@ -596,7 +567,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -596,7 +567,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tBlockReset(pBlock); tBlockReset(pBlock);
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
lastKey = TSKEY_MIN;
} }
return code; return code;
...@@ -614,12 +584,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S ...@@ -614,12 +584,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
if (code) goto _err; if (code) goto _err;
tBlockReset(&pCommitter->nBlock); tBlockReset(&pCommitter->nBlock);
pCommitter->nBlock.minKey = pBlock->minKey;
pCommitter->nBlock.maxKey = pBlock->maxKey;
pCommitter->nBlock.minVersion = pBlock->minVersion;
pCommitter->nBlock.nRow = pBlock->nRow;
pCommitter->nBlock.last = pBlock->last; pCommitter->nBlock.last = pBlock->last;
pCommitter->nBlock.hasDup = pBlock->hasDup;
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
pCommitter->cmprAlg); pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
......
...@@ -176,7 +176,8 @@ static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet ...@@ -176,7 +176,8 @@ static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet
if (pFrom && pTo) { if (pFrom && pTo) {
// head // head
if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) { if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) {
ASSERT(0); ASSERT(pFrom->fHead.size == pTo->fHead.size);
ASSERT(pFrom->fHead.offset == pTo->fHead.offset);
} else { } else {
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname); tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
taosRemoveFile(fname); taosRemoveFile(fname);
......
...@@ -1220,8 +1220,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ ...@@ -1220,8 +1220,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2; if (!ppBuf2) ppBuf2 = &pBuf2;
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, tBlockDataFirstKey(pBlockData)); TSKEY lastKey = TSKEY_MIN;
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, tBlockDataLastKey(pBlockData)); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = TSDBROW_KEY(&tsdbRowFromBlockData(pBlockData, iRow));
if (iRow == 0) {
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, key);
}
if (iRow == pBlockData->nRow - 1) {
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, key);
}
pBlock->minVersion = TMIN(pBlock->minVersion, key.version);
pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version);
if (key.ts == lastKey) {
pBlock->hasDup = 1;
}
lastKey = key.ts;
}
pBlock->nRow += pBlockData->nRow;
pSubBlock->nRow = pBlockData->nRow; pSubBlock->nRow = pBlockData->nRow;
pSubBlock->cmprAlg = cmprAlg; pSubBlock->cmprAlg = cmprAlg;
......
...@@ -1085,7 +1085,7 @@ _exit: ...@@ -1085,7 +1085,7 @@ _exit:
return code; return code;
} }
int32_t tColDataPCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tColDataPCmprFn(const void *p1, const void *p2) {
SColData *pColData1 = *(SColData **)p1; SColData *pColData1 = *(SColData **)p1;
SColData *pColData2 = *(SColData **)p2; SColData *pColData2 = *(SColData **)p2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册