提交 8d6e6d2f 编写于 作者: H Hongze Cheng

more code

上级 c398f9d2
......@@ -33,9 +33,12 @@ typedef int32_t (*tRBTreeCmprFn)(const SRBTreeNode *, const SRBTreeNode *);
#define tRBTreeMax(T) ((T)->max == ((T)->NIL) ? NULL : (T)->max)
void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn);
void tRBTreeClear(SRBTree *pTree);
SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z);
void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z);
SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey);
SRBTreeNode *tRBTreeDropMin(SRBTree *pTree);
SRBTreeNode *tRBTreeDropMax(SRBTree *pTree);
SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode);
// SRBTreeIter =============================================
......
......@@ -320,6 +320,7 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData,
#define TSDB_TOMB_FILE_DATA_ITER 3
#define TSDB_FILTER_FLAG_BY_VERSION 0x1
#define TSDB_FILTER_FLAG_BY_TABLEID 0x2
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2 *)(((char *)pNode) - offsetof(STsdbDataIter2, rbtn)))
/* open */
......@@ -906,6 +907,7 @@ struct STsdbFilterInfo {
int32_t flag;
int64_t sver;
int64_t ever;
TABLEID tbid;
};
#ifdef __cplusplus
......
......@@ -310,43 +310,67 @@ _exit:
return code;
}
static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) {
SMetaInfo info;
if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->tbid.uid, &info, NULL)) {
return true;
}
return false;
}
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) {
int32_t code = 0;
int32_t lino = 0;
if (pCompactor->pIter) {
code = tsdbDataIterNext2(pCompactor->pIter, NULL /* TODO */);
TSDB_CHECK_CODE(code, lino, _exit);
for (;;) {
if (pCompactor->pIter) {
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
pCompactor->pIter = NULL;
} else {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
if (pNode) {
int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode);
if (c > 0) {
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
pCompactor->pIter = NULL;
} else if (c == 0) {
ASSERT(0);
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
pCompactor->pIter = NULL;
} else {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
if (pNode) {
int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode);
if (c > 0) {
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
pCompactor->pIter = NULL;
} else if (c == 0) {
ASSERT(0);
}
}
}
}
}
if (pCompactor->pIter == NULL) {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
if (pNode) {
tRBTreeDrop(&pCompactor->rbt, pNode);
pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
if (pCompactor->pIter == NULL) {
SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt);
if (pNode) {
pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
}
}
}
if (ppRowInfo) {
if (pCompactor->pIter) {
*ppRowInfo = &pCompactor->pIter->rowInfo;
if (tsdbCompactTableIsDropped(pCompactor)) {
TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid};
tRBTreeClear(&pCompactor->rbt);
for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) {
code = tsdbDataIterNext2(pCompactor->pIter,
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid});
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) {
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
}
} else {
*ppRowInfo = &pCompactor->pIter->rowInfo;
break;
}
} else {
*ppRowInfo = NULL;
break;
}
}
......@@ -497,7 +521,7 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) {
code = tsdbCompactFileSetStart(pCompactor, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
// do compact
// do compact, end with a NULL row
SRowInfo *pRowInfo;
do {
code = tsdbCompactNextRow(pCompactor, &pRowInfo);
......
......@@ -202,6 +202,13 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
for (;;) {
while (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) {
if (pFilterInfo->tbid.uid == pIter->dIter.bData.uid) {
pIter->dIter.iRow = pIter->dIter.bData.nRow;
continue;
}
}
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver ||
pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) {
......@@ -211,8 +218,8 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
}
}
pIter->rowInfo.suid = pIter->dIter.bData.suid;
pIter->rowInfo.uid = pIter->dIter.bData.uid;
ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid);
ASSERT(pIter->rowInfo.uid = pIter->dIter.bData.uid);
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
pIter->dIter.iRow++;
goto _exit;
......@@ -225,6 +232,13 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
// filter
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) {
if (tTABLEIDCmprFn(&pFilterInfo->tbid, &pIter->rowInfo) == 0) {
pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem;
continue;
}
}
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) {
pIter->dIter.iDataBlk++;
......@@ -248,9 +262,22 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
if (pFilterInfo && (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID)) {
int32_t c = tTABLEIDCmprFn(pBlockIdx, &pFilterInfo->tbid);
if (c == 0) {
pIter->dIter.iBlockIdx++;
continue;
} else if (c < 0) {
ASSERT(0);
}
}
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->rowInfo.suid = pBlockIdx->suid;
pIter->rowInfo.uid = pBlockIdx->uid;
pIter->dIter.iBlockIdx++;
pIter->dIter.iDataBlk = 0;
......@@ -277,6 +304,14 @@ static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* p
for (;;) {
while (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) {
int64_t uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
if (pFilterInfo->tbid.uid == uid) {
pIter->sIter.iRow++;
continue;
}
}
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] ||
pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) {
......@@ -298,6 +333,14 @@ static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* p
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) {
if (pSttBlk->suid == pFilterInfo->tbid.suid && pSttBlk->minUid == pFilterInfo->tbid.uid &&
pSttBlk->maxUid == pFilterInfo->tbid.uid) {
pIter->sIter.iSttBlk++;
continue;
}
}
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) {
pIter->sIter.iSttBlk++;
......
......@@ -1250,10 +1250,7 @@ _exit:
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
if (pBlockData->nRow == 0) {
return 1;
}
int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
} else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
return pBlockData->nRow;
} else {
return pBlockData->nRow + 1;
......@@ -1261,31 +1258,7 @@ int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t ui
}
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
int32_t code = 0;
ASSERT(pBlockData->suid || pBlockData->uid);
if (pBlockData->nRow == 0) {
pBlockData->uid = uid;
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
}
if (pBlockData->uid && pBlockData->uid != uid) {
ASSERT(pBlockData->suid);
code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) return code;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
pBlockData->aUid[iRow] = pBlockData->uid;
}
pBlockData->uid = 0;
}
// decide append/update row
int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
} else {
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
......
......@@ -147,6 +147,10 @@ static SRBTreeNode *tRBTreePredecessor(SRBTree *pTree, SRBTreeNode *pNode) {
void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn) {
pTree->cmprFn = cmprFn;
tRBTreeClear(pTree);
}
void tRBTreeClear(SRBTree *pTree) {
pTree->n = 0;
pTree->NIL = &pTree->NILNODE;
pTree->NIL->color = BLACK;
......@@ -423,6 +427,22 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) {
return pNode;
}
SRBTreeNode *tRBTreeDropMin(SRBTree *pTree) {
SRBTreeNode *pNode = tRBTreeMin(pTree);
if (pNode) {
tRBTreeDrop(pTree, pNode);
}
return pNode;
}
SRBTreeNode *tRBTreeDropMax(SRBTree *pTree) {
SRBTreeNode *pNode = tRBTreeMax(pTree);
if (pNode) {
tRBTreeDrop(pTree, pNode);
}
return pNode;
}
SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) {
SRBTreeNode *pNode = pTree->root;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册