提交 23e82ff6 编写于 作者: H Hongze Cheng

more code

上级 bc9f53dd
...@@ -172,6 +172,7 @@ int32_t tGetDelData(uint8_t *p, void *ph); ...@@ -172,6 +172,7 @@ int32_t tGetDelData(uint8_t *p, void *ph);
void tMapDataReset(SMapData *pMapData); void tMapDataReset(SMapData *pMapData);
void tMapDataClear(SMapData *pMapData); void tMapDataClear(SMapData *pMapData);
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo);
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
......
...@@ -591,7 +591,7 @@ _err: ...@@ -591,7 +591,7 @@ _err:
return code; return code;
} }
static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { static int32_t tsdbMergeCommitDataBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0; int32_t code = 0;
STbData *pTbData = pIter->pTbData; STbData *pTbData = pIter->pTbData;
SBlockData *pBlockDataR = &pCommitter->dReader.bData; SBlockData *pBlockDataR = &pCommitter->dReader.bData;
...@@ -941,25 +941,9 @@ _err: ...@@ -941,25 +941,9 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0);
// merge commit table data
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
TSDBROW *pRow;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRow == NULL) goto _exit;
int32_t iBlock = 0; int32_t iBlock = 0;
SBlock block; SBlock block;
SBlock *pBlock = █ SBlock *pBlock = █
...@@ -1010,7 +994,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -1010,7 +994,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbMergeCommitData(pCommitter, pIter, pBlock); code = tsdbMergeCommitDataBlock(pCommitter, pIter, pBlock);
if (code) goto _err; if (code) goto _err;
} }
...@@ -1041,11 +1025,46 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -1041,11 +1025,46 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
} }
} }
// .data append and .last merge _exit:
code = tsdbMergeCommitLast(pCommitter, pIter); return code;
_err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
// merge commit table data
STbDataIter iter = {0};
TSDBROW *pRow;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter);
pRow = tsdbTbDataIterGet(&iter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRow == NULL) {
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) {
code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock);
if (code) goto _err;
}
goto _exit;
}
// commit data
code = tsdbMergeCommitData(pCommitter, &iter);
if (code) goto _err; if (code) goto _err;
// end // commit last
code = tsdbMergeCommitLast(pCommitter, &iter);
if (code) goto _err;
_exit:
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid}; SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
...@@ -1056,9 +1075,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { ...@@ -1056,9 +1075,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
goto _err; goto _err;
} }
} }
pRow = tsdbTbDataIterGet(&iter);
_exit:
pRow = tsdbTbDataIterGet(pIter);
if (pRow) { if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
} }
...@@ -1126,71 +1143,6 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -1126,71 +1143,6 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
if (code) goto _err; if (code) goto _err;
} }
// .last
while (true) {
if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break;
SBlockData *pBlockDataR = &pCommitter->dReader.bDatal;
SBlockData *pBlockDataW = &pCommitter->dWriter.bDatal;
tb_uid_t suid = pCommitter->dReader.pRowInfo->suid;
tb_uid_t uid = pCommitter->dReader.pRowInfo->uid;
ASSERT((pBlockDataR->suid && !pBlockDataR->uid) || (!pBlockDataR->suid && pBlockDataR->uid));
ASSERT(pBlockDataR->nRow > 0);
// commit and reset block data schema if need
if (pBlockDataW->suid || pBlockDataW->uid) {
if (pBlockDataW->suid != suid || pBlockDataW->suid == 0) {
if (pBlockDataW->nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
tBlockDataReset(pBlockDataW);
}
}
// set block data schema if need
if (pBlockDataW->suid == 0 && pBlockDataW->uid == 0) {
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid);
if (code) goto _err;
code = tBlockDataInit(pBlockDataW, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
}
// check if it can make sure that one table data in one block
int32_t nRow = 0;
if (pBlockDataR->suid) {
int32_t iRow = pCommitter->dReader.iRow;
while ((iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid)) {
nRow++;
iRow++;
}
} else {
ASSERT(pCommitter->dReader.iRow == 0);
nRow = pBlockDataR->nRow;
}
ASSERT(nRow > 0 && nRow < pCommitter->minRow);
if (pBlockDataW->nRow + nRow > pCommitter->maxRow) {
ASSERT(pBlockDataW->nRow > 0);
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
while (nRow > 0) {
code = tBlockDataAppendRow(pBlockDataW, &pCommitter->dReader.pRowInfo->row, NULL, uid);
if (code) goto _err;
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
nRow--;
}
}
return code; return code;
_err: _err:
......
...@@ -51,6 +51,22 @@ _exit: ...@@ -51,6 +51,22 @@ _exit:
return code; return code;
} }
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) {
int32_t code = 0;
pTo->nItem = pFrom->nItem;
pTo->nData = pFrom->nData;
code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem);
if (code) goto _exit;
code = tRealloc(&pTo->pData, pFrom->nData);
if (code) goto _exit;
memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem);
memcpy(pTo->pData, pFrom->pData, pFrom->nData);
_exit:
return code;
}
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
int32_t code = 0; int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册