提交 a4ca7704 编写于 作者: H Hongze Cheng

refact code

上级 94a7be4d
......@@ -104,204 +104,6 @@ static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) {
return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray);
}
#if 0
static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) {
int32_t code = 0;
int32_t lino = 0;
committer->ctx->tbid->suid = tbid->suid;
committer->ctx->tbid->uid = tbid->uid;
code = tsdbUpdateSkmTb(committer->tsdb, committer->ctx->tbid, committer->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
committer->blockDataIdx = 0;
for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) {
code = tBlockDataInit(&committer->blockData[i], committer->ctx->tbid, committer->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTSDataToDataTableEnd(SCommitter2 *committer) {
if (committer->ctx->tbid->uid == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
int32_t cidx = committer->blockDataIdx;
int32_t pidx = ((cidx + 1) & 1);
int32_t numRow = (committer->blockData[cidx].nRow + committer->blockData[pidx].nRow) / 2;
if (committer->blockData[pidx].nRow > 0 && numRow >= committer->minRow) {
ASSERT(committer->blockData[pidx].nRow == committer->maxRow);
SRowInfo row[1] = {{
.suid = committer->ctx->tbid->suid,
.uid = committer->ctx->tbid->uid,
.row = tsdbRowFromBlockData(committer->blockData + pidx, 0),
}};
for (int32_t i = 0; i < numRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(committer->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDataFileFlush(committer->dataWriter);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = numRow; i < committer->blockData[pidx].nRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(committer->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
row->row = tsdbRowFromBlockData(committer->blockData + cidx, 0);
for (int32_t i = 0; i < committer->blockData[cidx].nRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(committer->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
if (committer->blockData[pidx].nRow > 0) {
code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (committer->blockData[cidx].nRow < committer->minRow) {
code = tsdbSttFileWriteBlockData(committer->sttWriter, committer->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) {
tBlockDataReset(&committer->blockData[i]);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
SMetaInfo info;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
// end last table write
code = tsdbCommitTSDataToDataTableEnd(committer);
TSDB_CHECK_CODE(code, lino, _exit);
// Ignore table of obsolescence
if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
code = tsdbCommitTSDataToDataTableBegin(committer, (TABLEID *)row);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (row->row.type == TSDBROW_ROW_FMT) {
code = tsdbUpdateSkmRow(committer->tsdb, committer->ctx->tbid, TSDBROW_SVERSION(&row->row), committer->skmRow);
TSDB_CHECK_CODE(code, lino, _exit);
}
TSDBKEY key = TSDBROW_KEY(&row->row);
if (key.version <= committer->compactVersion //
&& committer->blockData[committer->blockDataIdx].nRow > 0 //
&& key.ts == committer->blockData[committer->blockDataIdx]
.aTSKEY[committer->blockData[committer->blockDataIdx].nRow - 1]) {
code =
tBlockDataUpdateRow(committer->blockData + committer->blockDataIdx, &row->row, committer->skmRow->pTSchema);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (committer->blockData[committer->blockDataIdx].nRow >= committer->maxRow) {
int32_t idx = ((committer->blockDataIdx + 1) & 1);
if (committer->blockData[idx].nRow >= committer->maxRow) {
code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + idx);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(committer->blockData + idx);
}
committer->blockDataIdx = idx;
}
code = tBlockDataAppendRow(&committer->blockData[committer->blockDataIdx], &row->row, committer->skmRow->pTSchema,
row->uid);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(committer->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbCommitTSDataToDataTableEnd(committer);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(committer->sttReader == NULL);
SMetaInfo info;
for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
if (row->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = row->suid;
committer->ctx->tbid->uid = row->uid;
// Ignore table of obsolescence
if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
}
TSKEY ts = TSDBROW_TS(&row->row);
if (ts > committer->ctx->maxKey) {
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbSttFileWriteRow(committer->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
#endif
static int32_t tsdbCommitTSData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
......
......@@ -97,197 +97,6 @@ _exit:
return code;
}
#if 0
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
int32_t cidx = merger->ctx->blockDataIdx;
int32_t pidx = (cidx + 1) % 2;
int32_t numRow = (merger->ctx->blockData[pidx].nRow + merger->ctx->blockData[cidx].nRow) / 2;
if (merger->ctx->blockData[pidx].nRow > 0 && numRow >= merger->minRow) {
ASSERT(merger->ctx->blockData[pidx].nRow == merger->maxRow);
SRowInfo row[1] = {{
.suid = merger->ctx->tbid->suid,
.uid = merger->ctx->tbid->uid,
.row = tsdbRowFromBlockData(merger->ctx->blockData + pidx, 0),
}};
for (int32_t i = 0; i < numRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(merger->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDataFileFlush(merger->dataWriter);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = numRow; i < merger->ctx->blockData[pidx].nRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(merger->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
row->row = tsdbRowFromBlockData(merger->ctx->blockData + cidx, 0);
for (int32_t i = 0; i < merger->ctx->blockData[cidx].nRow; i++) {
row->row.iRow = i;
code = tsdbDataFileWriteRow(merger->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
if (merger->ctx->blockData[pidx].nRow > 0) {
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx->blockData[cidx].nRow < merger->minRow) {
code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
tBlockDataReset(merger->ctx->blockData + i);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
merger->ctx->blockDataIdx = 0;
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
code = tBlockDataInit(merger->ctx->blockData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
// data
for (SRowInfo *row; (row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL;) {
if (row->uid != merger->ctx->tbid->uid) {
code = tsdbMergeToDataTableEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit);
merger->ctx->tbid->suid = row->suid;
merger->ctx->tbid->uid = row->uid;
code = tsdbMergeToDataTableBegin(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
if (key->version <= merger->compactVersion //
&& merger->ctx->blockData[merger->ctx->blockDataIdx].nRow > 0 //
&& merger->ctx->blockData[merger->ctx->blockDataIdx]
.aTSKEY[merger->ctx->blockData[merger->ctx->blockDataIdx].nRow - 1] == key->ts) {
// update
code = tBlockDataUpdateRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (merger->ctx->blockData[merger->ctx->blockDataIdx].nRow >= merger->maxRow) {
int32_t idx = (merger->ctx->blockDataIdx + 1) % 2;
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + idx);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(merger->ctx->blockData + idx);
// switch to next bData
merger->ctx->blockDataIdx = idx;
}
code = tBlockDataAppendRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL, row->uid);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(merger->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeToDataTableEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit);
// tomb
STombRecord *record;
while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
if (tsdbSttFileWriterIsOpened(merger->sttWriter)) {
code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDataFileWriteTombRecord(merger->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(merger->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
// data
SRowInfo *row;
while ((row = tsdbIterMergerGetData(merger->dataIterMerger))) {
code = tsdbSttFileWriteRow(merger->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(merger->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
// tomb
STombRecord *record;
while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(merger->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
}
#endif
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册