提交 438d0cae 编写于 作者: H Hongze Cheng

mroe code

上级 9a4f6abe
...@@ -35,6 +35,7 @@ typedef enum { ...@@ -35,6 +35,7 @@ typedef enum {
TSDB_ITER_TYPE_MEMT, TSDB_ITER_TYPE_MEMT,
TSDB_ITER_TYPE_STT_TOMB, TSDB_ITER_TYPE_STT_TOMB,
TSDB_ITER_TYPE_DATA_TOMB, TSDB_ITER_TYPE_DATA_TOMB,
TSDB_ITER_TYPE_MEMT_TOMB,
} EIterType; } EIterType;
typedef struct { typedef struct {
...@@ -43,7 +44,7 @@ typedef struct { ...@@ -43,7 +44,7 @@ typedef struct {
SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB
struct { struct {
SMemTable *memt; SMemTable *memt; // TSDB_ITER_TYPE_MEMT_TOMB
TSDBKEY from[1]; TSDBKEY from[1];
}; // TSDB_ITER_TYPE_MEMT }; // TSDB_ITER_TYPE_MEMT
}; };
......
...@@ -455,50 +455,123 @@ _exit: ...@@ -455,50 +455,123 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitDelData(SCommitter2 *committer) { static int32_t tsdbCommitTombDataToStt(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SMemTable *mem = committer->tsdb->imem; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
if (mem->nDel == 0 // code = tsdbIterMergerNext(committer->iterMerger);
|| (committer->ctx->fset == NULL // TSDB_CHECK_CODE(code, lino, _exit);
&& committer->sttWriter == NULL) //
) {
committer->ctx->nextKey = committer->ctx->maxKey + 1;
goto _exit;
} }
SRBTreeIter iter[1] = {tRBTreeIterCreate(committer->tsdb->imem->tbDataTree, 1)}; _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) { static int32_t tsdbCommitTombDataToData(SCommitter2 *committer) {
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); int32_t code = 0;
STombRecord record[1] = {{ int32_t lino = 0;
.suid = tbData->suid,
.uid = tbData->uid,
}};
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) {
if (delData->eKey < committer->ctx->minKey) continue; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
if (delData->sKey > committer->ctx->maxKey) { code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
record->version = delData->version; code = tsdbIterMergerNext(committer->iterMerger);
record->skey = TMAX(delData->sKey, committer->ctx->minKey); TSDB_CHECK_CODE(code, lino, _exit);
if (delData->eKey > committer->ctx->maxKey) { }
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1);
record->ekey = committer->ctx->maxKey;
} else { } else {
record->ekey = delData->eKey; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbDataFileWriteTombRecord(committer->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record); _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
STsdbIter *iter;
STsdbIterConfig config[1];
if (committer->sttReader) {
const TSttSegReaderArray *readerArray;
tsdbSttFileReaderGetSegReader(committer->sttReader, &readerArray);
SSttSegReader *segReader;
TARRAY2_FOREACH(readerArray, segReader) {
config->type = TSDB_ITER_TYPE_STT_TOMB;
config->sttReader = segReader;
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->iterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
config->type = TSDB_ITER_TYPE_MEMT_TOMB;
config->memt = committer->tsdb->imem;
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->iterArray, iter);
TSDB_CHECK_CODE(code, lino, _exit);
// open iter
code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, true);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
} }
return code;
}
static int32_t tsdbCommitTombDataCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->iterMerger);
TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);
return 0;
}
static int32_t tsdbCommitTombData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbCommitTombDataOpenIter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
if (committer->sttTrigger > 1) {
code = tsdbCommitTombDataToStt(committer);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbCommitTombDataToData(committer);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbCommitTombDataCloseIter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
...@@ -578,7 +651,7 @@ static int32_t tsdbCommitFileSet(SCommitter2 *committer) { ...@@ -578,7 +651,7 @@ static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
code = tsdbCommitTSData(committer); code = tsdbCommitTSData(committer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommitDelData(committer); code = tsdbCommitTombData(committer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// fset commit end // fset commit end
......
...@@ -64,6 +64,12 @@ struct STsdbIter { ...@@ -64,6 +64,12 @@ struct STsdbIter {
STombBlock tombBlock[1]; STombBlock tombBlock[1];
int32_t tombBlockIdx; int32_t tombBlockIdx;
} dataTomb[1]; } dataTomb[1];
struct {
SMemTable *memt;
SRBTreeIter rbtIter[1];
STbData *tbData;
SDelData *delData;
} memtTomb[1];
}; };
}; };
...@@ -259,6 +265,45 @@ _exit: ...@@ -259,6 +265,45 @@ _exit:
return 0; return 0;
} }
static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
while (!iter->ctx->noMoreData) {
for (; iter->memtTomb->delData;) {
if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
iter->memtTomb->delData = NULL;
break;
}
iter->record->suid = iter->memtTomb->tbData->suid;
iter->record->uid = iter->memtTomb->tbData->uid;
iter->record->version = iter->memtTomb->delData->version;
iter->record->skey = iter->memtTomb->delData->sKey;
iter->record->ekey = iter->memtTomb->delData->eKey;
iter->memtTomb->delData = iter->memtTomb->delData->pNext;
goto _exit;
}
for (;;) {
SRBTreeNode *node = tRBTreeIterNext(iter->memtTomb->rbtIter);
if (node == NULL) {
iter->ctx->noMoreData = true;
goto _exit;
}
iter->memtTomb->tbData = TCONTAINER_OF(node, STbData, rbtn);
if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
continue;
} else {
iter->memtTomb->delData = iter->memtTomb->tbData->pHead;
break;
}
}
}
_exit:
return 0;
}
static int32_t tsdbSttIterOpen(STsdbIter *iter) { static int32_t tsdbSttIterOpen(STsdbIter *iter) {
int32_t code; int32_t code;
...@@ -330,6 +375,13 @@ static int32_t tsdbDataTombIterOpen(STsdbIter *iter) { ...@@ -330,6 +375,13 @@ static int32_t tsdbDataTombIterOpen(STsdbIter *iter) {
return tsdbDataTombIterNext(iter, NULL); return tsdbDataTombIterNext(iter, NULL);
} }
static int32_t tsdbMemTombIterOpen(STsdbIter *iter) {
int32_t code;
iter->memtTomb->rbtIter[0] = tRBTreeIterCreate(iter->memtTomb->memt->tbDataTree, 1);
return tsdbMemTombIterNext(iter, NULL);
}
static int32_t tsdbDataIterClose(STsdbIter *iter) { static int32_t tsdbDataIterClose(STsdbIter *iter) {
tBrinBlockDestroy(iter->dataData->brinBlock); tBrinBlockDestroy(iter->dataData->brinBlock);
tBlockDataDestroy(iter->dataData->blockData); tBlockDataDestroy(iter->dataData->blockData);
...@@ -432,6 +484,10 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { ...@@ -432,6 +484,10 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
iter[0]->dataTomb->reader = config->dataReader; iter[0]->dataTomb->reader = config->dataReader;
code = tsdbDataTombIterOpen(iter[0]); code = tsdbDataTombIterOpen(iter[0]);
break; break;
case TSDB_ITER_TYPE_MEMT_TOMB:
iter[0]->memtTomb->memt = config->memt;
code = tsdbMemTombIterOpen(iter[0]);
break;
default: default:
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
ASSERTS(false, "Not implemented"); ASSERTS(false, "Not implemented");
...@@ -471,6 +527,8 @@ int32_t tsdbIterClose(STsdbIter **iter) { ...@@ -471,6 +527,8 @@ int32_t tsdbIterClose(STsdbIter **iter) {
case TSDB_ITER_TYPE_DATA_TOMB: case TSDB_ITER_TYPE_DATA_TOMB:
tsdbDataTombIterClose(iter[0]); tsdbDataTombIterClose(iter[0]);
break; break;
case TSDB_ITER_TYPE_MEMT_TOMB:
break;
default: default:
ASSERT(false); ASSERT(false);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册