提交 1427f921 编写于 作者: H Hongze Cheng

more code

上级 3b3b8d4f
......@@ -65,12 +65,13 @@ typedef struct {
SRBTree rtree;
STsdbDataIter *pIter;
SBlockData bData;
SSkmInfo tbSkm;
} STsdbCompactor;
#define TSDB_FLG_DEEP_COMPACT 0x1
// ITER =========================
static int32_t tsdbDataIterNext(STsdbDataIter *pIter);
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId);
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
......@@ -132,7 +133,7 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter)
pDataDIter->iDataBlk = 0;
pDataDIter->iRow = 0;
code = tsdbDataIterNext(pIter);
code = tsdbDataIterNext(pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -182,7 +183,7 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt
pSttDIter->iSttBlk = -1;
pSttDIter->iRow = -1;
code = tsdbDataIterNext(pIter);
code = tsdbDataIterNext(pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......@@ -205,7 +206,7 @@ static void tsdbDataIterClose(STsdbDataIter *pIter) {
ASSERT(0);
}
static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) {
int32_t code = 0;
int32_t lino = 0;
......@@ -218,25 +219,49 @@ static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
} else if (pIter->flag & TSDB_ITER_TYPE_STT) {
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
pSttDIter->iRow++;
if (pSttDIter->iRow < pSttDIter->bData.nRow) {
for (;;) {
if (++pSttDIter->iRow >= pSttDIter->bData.nRow) {
for (;;) {
if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk);
// check exclusion
if (pExcludeTableId) {
if (pExcludeTableId->uid) { // exclude (suid, uid)
if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue;
} else { // exclude (suid, *)
if (pSttBlk->suid == pExcludeTableId->suid) continue;
}
}
code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->rowInfo.suid = pSttBlk->suid;
pSttDIter->iRow = 0;
break;
} else {
// iter end, all set 0 and exit
pIter->rowInfo.suid = 0;
pIter->rowInfo.uid = 0;
goto _exit;
}
}
}
pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
} else {
pSttDIter->iSttBlk++;
if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt,
taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iRow = 0;
pIter->rowInfo.suid = pSttDIter->bData.suid;
pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
} else {
pIter->rowInfo.suid = 0;
pIter->rowInfo.uid = 0;
// check exclusion
if (pExcludeTableId) {
if (pExcludeTableId->uid) { // exclude (suid, uid)
if (pIter->rowInfo.uid == pExcludeTableId->uid) continue;
} else { // exclude (suid, *)
if (pIter->rowInfo.suid == pExcludeTableId->suid) continue;
}
}
break;
}
} else {
ASSERT(0);
......@@ -330,37 +355,94 @@ _exit:
return code;
}
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) {
int32_t code = 0;
int32_t lino = 0;
if (pCompactor->pIter) {
code = tsdbDataIterNext(pCompactor->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
for (;;) {
if (pCompactor->pIter) {
code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
pCompactor->pIter = NULL;
} else {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
if (pNode) {
STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
pCompactor->pIter = NULL;
} else {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
if (pNode) {
STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
ASSERT(c);
int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
ASSERT(c);
if (c > 0) {
tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
pCompactor->pIter = NULL;
if (c > 0) {
tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
pCompactor->pIter = NULL;
}
}
}
}
if (pCompactor->pIter == NULL) {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
if (pNode) {
pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
tRBTreeDrop(&pCompactor->rtree, pNode);
if (pExcludeTableId) {
if (pExcludeTableId->uid) {
if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue;
} else {
if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue;
}
}
}
}
break;
}
if (pCompactor->pIter == NULL) {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
if (pNode) {
pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
tRBTreeDrop(&pCompactor->rtree, pNode);
_exit:
return code;
}
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
code = tsdbCompactNextRowImpl(pCompactor, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
// check if the table of the row exists
if (pCompactor->pIter) {
if (pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid &&
pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid) {
break;
} else {
SMetaInfo info;
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL) !=
TSDB_CODE_SUCCESS) {
// table not exist
} else {
// update table schema
STSchema *pTSchema =
metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, info.version, 1);
if (pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->tbSkm.suid = pCompactor->pIter->rowInfo.suid;
pCompactor->tbSkm.uid = pCompactor->pIter->rowInfo.uid;
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
pCompactor->tbSkm.pTSchema = pTSchema;
break;
}
}
} else {
// iter end, just break out
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册