提交 e4840cb5 编写于 作者: H Hongze Cheng

fix

上级 bd01d08b
...@@ -695,7 +695,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { ...@@ -695,7 +695,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
brinBlk->size[i]); brinBlk->size[i]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
brinBlk->dp[i].size += brinBlk->size[i]; brinBlk->dp->size += brinBlk->size[i];
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i]; writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i];
} }
...@@ -710,7 +710,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { ...@@ -710,7 +710,7 @@ static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
brinBlk->size[j]); brinBlk->size[j]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
brinBlk->dp[i].size += brinBlk->size[j]; brinBlk->dp->size += brinBlk->size[j];
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j]; writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j];
} }
......
...@@ -39,8 +39,8 @@ typedef struct { ...@@ -39,8 +39,8 @@ typedef struct {
SSttLvl *lvl; SSttLvl *lvl;
STFileObj *fobj; STFileObj *fobj;
TABLEID tbid[1]; TABLEID tbid[1];
int32_t bDataIdx; int32_t blockDataIdx;
SBlockData bData[2]; SBlockData blockData[2];
} ctx[1]; } ctx[1];
TFileOpArray fopArr[1]; TFileOpArray fopArr[1];
...@@ -96,8 +96,8 @@ static int32_t tsdbMergerClose(SMerger *merger) { ...@@ -96,8 +96,8 @@ static int32_t tsdbMergerClose(SMerger *merger) {
TARRAY2_DESTROY(merger->dataIterArr, NULL); TARRAY2_DESTROY(merger->dataIterArr, NULL);
TARRAY2_DESTROY(merger->sttReaderArr, NULL); TARRAY2_DESTROY(merger->sttReaderArr, NULL);
TARRAY2_DESTROY(merger->fopArr, NULL); TARRAY2_DESTROY(merger->fopArr, NULL);
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
tBlockDataDestroy(merger->ctx->bData + i); tBlockDataDestroy(merger->ctx->blockData + i);
} }
tDestroyTSchema(merger->skmTb->pTSchema); tDestroyTSchema(merger->skmTb->pTSchema);
tDestroyTSchema(merger->skmRow->pTSchema); tDestroyTSchema(merger->skmRow->pTSchema);
...@@ -110,21 +110,21 @@ _exit: ...@@ -110,21 +110,21 @@ _exit:
} }
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
if (merger->ctx->bData[0].nRow + merger->ctx->bData[1].nRow == 0) return 0; if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t cidx = merger->ctx->bDataIdx; int32_t cidx = merger->ctx->blockDataIdx;
int32_t pidx = (cidx + 1) % 2; int32_t pidx = (cidx + 1) % 2;
int32_t numRow = (merger->ctx->bData[pidx].nRow + merger->ctx->bData[cidx].nRow) / 2; int32_t numRow = (merger->ctx->blockData[pidx].nRow + merger->ctx->blockData[cidx].nRow) / 2;
if (merger->ctx->bData[pidx].nRow > 0 && numRow >= merger->minRow) { if (merger->ctx->blockData[pidx].nRow > 0 && numRow >= merger->minRow) {
ASSERT(merger->ctx->bData[pidx].nRow == merger->maxRow); ASSERT(merger->ctx->blockData[pidx].nRow == merger->maxRow);
SRowInfo row[1] = {{ SRowInfo row[1] = {{
.suid = merger->ctx->tbid->suid, .suid = merger->ctx->tbid->suid,
.uid = merger->ctx->tbid->uid, .uid = merger->ctx->tbid->uid,
.row = tsdbRowFromBlockData(merger->ctx->bData + pidx, 0), .row = tsdbRowFromBlockData(merger->ctx->blockData + pidx, 0),
}}; }};
for (int32_t i = 0; i < numRow; i++) { for (int32_t i = 0; i < numRow; i++) {
...@@ -137,34 +137,34 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { ...@@ -137,34 +137,34 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
code = tsdbDataFileFlush(merger->dataWriter); code = tsdbDataFileFlush(merger->dataWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) { for (int32_t i = numRow; i < merger->ctx->blockData[pidx].nRow; i++) {
row->row.iRow = i; row->row.iRow = i;
code = tsdbDataFileWriteRow(merger->dataWriter, row); code = tsdbDataFileWriteRow(merger->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0); row->row = tsdbRowFromBlockData(merger->ctx->blockData + cidx, 0);
for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) { for (int32_t i = 0; i < merger->ctx->blockData[cidx].nRow; i++) {
row->row.iRow = i; row->row.iRow = i;
code = tsdbDataFileWriteRow(merger->dataWriter, row); code = tsdbDataFileWriteRow(merger->dataWriter, row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} else { } else {
if (merger->ctx->bData[pidx].nRow > 0) { if (merger->ctx->blockData[pidx].nRow > 0) {
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx); code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (merger->ctx->bData[cidx].nRow < merger->minRow) { if (merger->ctx->blockData[cidx].nRow < merger->minRow) {
code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->bData + cidx); code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx); code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
tBlockDataReset(merger->ctx->bData + i); tBlockDataReset(merger->ctx->blockData + i);
} }
_exit: _exit:
...@@ -181,9 +181,9 @@ static int32_t tsdbMergeToDataTableBegin(SMerger *merger) { ...@@ -181,9 +181,9 @@ static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb); code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
merger->ctx->bDataIdx = 0; merger->ctx->blockDataIdx = 0;
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
code = tBlockDataInit(merger->ctx->bData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0); code = tBlockDataInit(merger->ctx->blockData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -213,27 +213,27 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) { ...@@ -213,27 +213,27 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) {
TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
if (key->version <= merger->compactVersion // if (key->version <= merger->compactVersion //
&& merger->ctx->bData[merger->ctx->bDataIdx].nRow > 0 // && merger->ctx->blockData[merger->ctx->blockDataIdx].nRow > 0 //
&& merger->ctx->bData[merger->ctx->bDataIdx].aTSKEY[merger->ctx->bData[merger->ctx->bDataIdx].nRow - 1] == && merger->ctx->blockData[merger->ctx->blockDataIdx]
key->ts) { .aTSKEY[merger->ctx->blockData[merger->ctx->blockDataIdx].nRow - 1] == key->ts) {
// update // update
code = tBlockDataUpdateRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL); code = tBlockDataUpdateRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) { if (merger->ctx->blockData[merger->ctx->blockDataIdx].nRow >= merger->maxRow) {
int32_t idx = (merger->ctx->bDataIdx + 1) % 2; int32_t idx = (merger->ctx->blockDataIdx + 1) % 2;
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + idx); code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + idx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(merger->ctx->bData + idx); tBlockDataClear(merger->ctx->blockData + idx);
// switch to next bData // switch to next bData
merger->ctx->bDataIdx = idx; merger->ctx->blockDataIdx = idx;
} }
code = tBlockDataAppendRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL, row->uid); code = tBlockDataAppendRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL, row->uid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -510,9 +510,9 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { ...@@ -510,9 +510,9 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
merger->ctx->tbid->suid = 0; merger->ctx->tbid->suid = 0;
merger->ctx->tbid->uid = 0; merger->ctx->tbid->uid = 0;
merger->ctx->bDataIdx = 0; merger->ctx->blockDataIdx = 0;
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); ++i) {
tBlockDataReset(merger->ctx->bData + i); tBlockDataReset(merger->ctx->blockData + i);
} }
// open reader // open reader
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册