From 3753c7d602863be64bf5a89d0b647a4ee4046cf1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Jun 2023 09:23:56 +0800 Subject: [PATCH] refact more --- .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 356 ++++++++++++------ 1 file changed, 241 insertions(+), 115 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index cf5b93a7b6..dc16b5803a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -474,36 +474,52 @@ struct SDataFileWriter { struct { bool opened; SDataFileReader *reader; + // for ts data - const TBlockIdxArray *blockIdxArray; - int32_t blockIdxArrayIdx; - bool tbHasOldData; - const TDataBlkArray *dataBlkArray; - int32_t dataBlkArrayIdx; - SBlockData bData[1]; - int32_t iRow; - TABLEID tbid[1]; + TABLEID tbid[1]; + bool tbHasOldData; + + const TBrinBlkArray *brinBlkArray; + int32_t brinBlkArrayIdx; + SBrinBlock brinBlock[1]; + int32_t brinBlockIdx; + SBlockData blockData[1]; + int32_t blockDataIdx; // for tomb data bool hasOldTomb; const TTombBlkArray *tombBlkArray; int32_t tombBlkArrayIdx; - STombBlock tData[1]; - int32_t iRowTomb; + STombBlock tombBlock[1]; + int32_t tombBlockIdx; + +#if 0 + const TBlockIdxArray *blockIdxArray; + int32_t blockIdxArrayIdx; + const TDataBlkArray *dataBlkArray; + int32_t dataBlkArrayIdx; +#endif } ctx[1]; STFile files[TSDB_FTYPE_MAX]; STsdbFD *fd[TSDB_FTYPE_MAX]; - SHeadFooter headFooter[1]; - STombFooter tombFooter[1]; + SHeadFooter headFooter[1]; + STombFooter tombFooter[1]; + + TBrinBlkArray brinBlkArray[1]; + SBrinBlock brinBlock[1]; + SBlockData blockData[1]; + + TTombBlkArray tombBlkArray[1]; + STombBlock tombBlock[1]; + +#if 0 TBlockIdxArray blockIdxArray[1]; TDataBlkArray dataBlkArray[1]; - TTombBlkArray tombBlkArray[1]; - SBlockData bData[1]; - STbStatisBlock sData[1]; - STombBlock tData[1]; +#endif }; +#if 0 static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; @@ -524,6 +540,7 @@ _exit: } return code; } +#endif static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) { ASSERT(0); @@ -535,13 +552,15 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { tsdbDataFileReaderClose(&writer->ctx->reader); } - tTombBlockDestroy(writer->tData); - tStatisBlockDestroy(writer->sData); - tBlockDataDestroy(writer->bData); + tTombBlockDestroy(writer->tombBlock); + // tStatisBlockDestroy(writer->statisBlock); + tBlockDataDestroy(writer->blockData); TARRAY2_DESTROY(writer->tombBlkArray, NULL); +#if 0 TARRAY2_DESTROY(writer->dataBlkArray, NULL); TARRAY2_DESTROY(writer->blockIdxArray, NULL); - tTombBlockDestroy(writer->ctx->tData); +#endif + tTombBlockDestroy(writer->ctx->tombBlock); for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) { tFree(writer->bufArr[i]); @@ -565,7 +584,9 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { config->files[i].exist = writer->config->files[i].exist; - config->files[i].file = writer->config->files[i].file; + if (config->files[i].exist) { + config->files[i].file = writer->config->files[i].file; + } } code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader); @@ -584,6 +605,7 @@ _exit: static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; + int32_t ftype; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; @@ -593,7 +615,6 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { code = tsdbDataFileWriterDoOpenReader(writer); TSDB_CHECK_CODE(code, lino, _exit); - int32_t ftype; // .head ftype = TSDB_FTYPE_HEAD; writer->files[ftype] = (STFile){ @@ -735,9 +756,11 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b TARRAY2_DESTROY(smaArr, NULL); +#if 0 // to dataBlkArray code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); TSDB_CHECK_CODE(code, lino, _exit); +#endif tBlockDataClear(bData); @@ -767,8 +790,10 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += blockIdx->size; +#if 0 code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); TSDB_CHECK_CODE(code, lino, _exit); +#endif _exit: if (code) { @@ -788,19 +813,19 @@ static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { } TSDBKEY key[1] = {TSDBROW_KEY(row)}; - if (key->version <= writer->config->compactVersion // - && writer->bData->nRow > 0 // - && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // + if (key->version <= writer->config->compactVersion // + && writer->blockData->nRow > 0 // + && writer->blockData->aTSKEY[writer->blockData->nRow - 1] == key->ts // ) { - code = tBlockDataUpdateRow(writer->bData, row, writer->config->skmRow->pTSchema); + code = tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema); TSDB_CHECK_CODE(code, lino, _exit); } else { - if (writer->bData->nRow >= writer->config->maxRow) { - code = tsdbDataFileWriteDataBlock(writer, writer->bData); + if (writer->blockData->nRow >= writer->config->maxRow) { + code = tsdbDataFileWriteDataBlock(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(writer->bData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); + code = tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); TSDB_CHECK_CODE(code, lino, _exit); } @@ -816,8 +841,8 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) int32_t lino = 0; while (writer->ctx->tbHasOldData) { - for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { - TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; + for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { + TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx)}; int32_t c = tsdbRowCmprFn(row, row1); ASSERT(c); @@ -829,6 +854,7 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) } } +#if 0 if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) { writer->ctx->tbHasOldData = false; break; @@ -845,7 +871,7 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1); if (c < 0) { - code = tsdbDataFileWriteDataBlock(writer, writer->bData); + code = tsdbDataFileWriteDataBlock(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); @@ -853,14 +879,15 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) } else if (c > 0) { goto _do_write; } else { - code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->bData); + code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->blockData); TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->iRow = 0; + writer->ctx->blockDataIdx = 0; writer->ctx->dataBlkArrayIdx++; break; } } +#endif } _do_write: @@ -875,38 +902,124 @@ _exit: } static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { - if (!writer->ctx->tbid->uid) return 0; + if (writer->ctx->tbid->uid == 0) return 0; int32_t code = 0; int32_t lino = 0; - // handle table remain data if (writer->ctx->tbHasOldData) { - for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { - TSDBROW row[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; - - code = tsdbDataFileDoWriteTSRow(writer, row); + for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { + TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + code = tsdbDataFileDoWriteTSRow(writer, &row); TSDB_CHECK_CODE(code, lino, _exit); } + } - code = tsdbDataFileWriteDataBlock(writer, writer->bData); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbDataFileWriteDataBlock(writer, writer->blockData); + TSDB_CHECK_CODE(code, lino, _exit); - for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { - code = TARRAY2_APPEND_PTR(writer->dataBlkArray, - TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx)); - TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { + if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // get SBrinBlk + SBrinBlk brinBlk[1] = { + { + .dp[0] = + { + .offset = writer->files[TSDB_FTYPE_HEAD].size, + .size = 0, + }, + .minTbid = + { + .suid = TARRAY2_FIRST(writer->brinBlock->suid), + .uid = TARRAY2_FIRST(writer->brinBlock->uid), + }, + .maxTbid = + { + .suid = TARRAY2_LAST(writer->brinBlock->suid), + .uid = TARRAY2_LAST(writer->brinBlock->uid), + }, + .minVer = TARRAY2_FIRST(writer->brinBlock->minVer), + .maxVer = TARRAY2_FIRST(writer->brinBlock->minVer), + .numRec = BRIN_BLOCK_SIZE(writer->brinBlock), + .cmprAlg = writer->config->cmprAlg, + }, + }; + + for (int32_t i = 1; i < BRIN_BLOCK_SIZE(writer->brinBlock); i++) { + if (brinBlk->minVer > TARRAY2_GET(writer->brinBlock->minVer, i)) { + brinBlk->minVer = TARRAY2_GET(writer->brinBlock->minVer, i); } + if (brinBlk->maxVer < TARRAY2_GET(writer->brinBlock->maxVer, i)) { + brinBlk->maxVer = TARRAY2_GET(writer->brinBlock->maxVer, i); + } + } - writer->ctx->tbHasOldData = false; + // write to file + for (int32_t i = 0; i < ARRAY_SIZE(writer->brinBlock->dataArr1); i++) { + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr1 + i), + TARRAY2_DATA_LEN(writer->brinBlock->dataArr1 + i), TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, + &writer->config->bufArr[0], 0, &brinBlk->size[i], &writer->config->bufArr[1]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], + brinBlk->size[i]); + TSDB_CHECK_CODE(code, lino, _exit); + + brinBlk->dp[i].size += brinBlk->size[i]; + writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i]; + } + + for (int32_t i = 0, j = ARRAY_SIZE(writer->brinBlock->dataArr1); i < ARRAY_SIZE(writer->brinBlock->dataArr2); + i++, j++) { + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr2 + i), + TARRAY2_DATA_LEN(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, + &writer->config->bufArr[0], 0, &brinBlk->size[j], &writer->config->bufArr[1]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], + brinBlk->size[j]); + TSDB_CHECK_CODE(code, lino, _exit); + + brinBlk->dp[i].size += brinBlk->size[j]; + writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j]; } - code = tsdbDataFileWriteDataBlock(writer, writer->bData); + // append to brinBlkArray + code = TARRAY2_APPEND_PTR(writer->brinBlkArray, brinBlk); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbDataFileWriteDataBlk(writer, writer->dataBlkArray); + tBrinBlockClear(writer->brinBlock); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) { + int32_t code = 0; + int32_t lino = 0; + + code = tBrinBlockPut(writer->brinBlock, record); TSDB_CHECK_CODE(code, lino, _exit); + if (BRIN_BLOCK_SIZE(writer->brinBlock) >= writer->config->maxRow) { + code = tsdbDataFileWriteBrinBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -919,57 +1032,64 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA int32_t lino = 0; SMetaInfo info; + ASSERT(writer->ctx->blockDataIdx == writer->ctx->blockData->nRow); + ASSERT(writer->blockData->nRow == 0); + writer->ctx->tbHasOldData = false; + while (writer->ctx->brinBlkArray) { // skip data of previous table + for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { + // skip removed table + int64_t uid = TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx); + if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { + for (int32_t idx = writer->ctx->brinBlockIdx + 1; // + idx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock) // + && uid == TARRAY2_GET(writer->ctx->brinBlock->uid, idx); + idx++, writer->ctx->brinBlockIdx++) { + } + continue; + } - // skip data of previous table - if (writer->ctx->blockIdxArray) { - for (; writer->ctx->blockIdxArrayIdx < TARRAY2_SIZE(writer->ctx->blockIdxArray); writer->ctx->blockIdxArrayIdx++) { - const SBlockIdx *blockIdx = TARRAY2_GET_PTR(writer->ctx->blockIdxArray, writer->ctx->blockIdxArrayIdx); + SBrinRecord record[1]; + tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); - int32_t c = tTABLEIDCmprFn(blockIdx, tbid); + int32_t c = tTABLEIDCmprFn(record, tbid); if (c < 0) { - if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, blockIdx->uid, &info, NULL) == 0) { - code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); - - writer->ctx->tbid->suid = blockIdx->suid; - writer->ctx->tbid->uid = blockIdx->uid; - - code = tsdbDataFileWriteDataBlk(writer, writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - continue; - } + code = tsdbDataFileWriteBrinRecord(writer, record); + TSDB_CHECK_CODE(code, lino, _exit); } else { if (c == 0) { writer->ctx->tbHasOldData = true; + } + goto _begin; + } + } - code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); + if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { + writer->ctx->brinBlkArray = NULL; + break; + } - writer->ctx->dataBlkArrayIdx = 0; + for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { + const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); - tBlockDataReset(writer->ctx->bData); - writer->ctx->iRow = 0; + code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); + TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->blockIdxArrayIdx++; - } - break; - } + writer->ctx->brinBlockIdx = 0; + writer->ctx->brinBlkArrayIdx++; + break; } } - // make sure state is correct - writer->ctx->tbid[0] = tbid[0]; - - if (tbid->suid == INT64_MAX && tbid->uid == INT64_MAX) goto _exit; +_begin: + writer->ctx->tbid[0] = *tbid; - TARRAY2_CLEAR(writer->dataBlkArray, NULL); + if (tbid->uid == INT64_MAX) goto _exit; code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(writer->bData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0); + code = tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -996,25 +1116,25 @@ _exit: } static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { - if (TOMB_BLOCK_SIZE(writer->tData) == 0) return 0; + if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0; int32_t code = 0; int32_t lino = 0; STombBlk tombBlk[1] = {{ - .numRec = TOMB_BLOCK_SIZE(writer->tData), + .numRec = TOMB_BLOCK_SIZE(writer->tombBlock), .minTbid = { - .suid = TARRAY2_FIRST(writer->tData->suid), - .uid = TARRAY2_FIRST(writer->tData->uid), + .suid = TARRAY2_FIRST(writer->tombBlock->suid), + .uid = TARRAY2_FIRST(writer->tombBlock->uid), }, .maxTbid = { - .suid = TARRAY2_LAST(writer->tData->suid), - .uid = TARRAY2_LAST(writer->tData->uid), + .suid = TARRAY2_LAST(writer->tombBlock->suid), + .uid = TARRAY2_LAST(writer->tombBlock->uid), }, - .minVer = TARRAY2_FIRST(writer->tData->version), - .maxVer = TARRAY2_FIRST(writer->tData->version), + .minVer = TARRAY2_FIRST(writer->tombBlock->version), + .maxVer = TARRAY2_FIRST(writer->tombBlock->version), .dp[0] = { .offset = writer->files[TSDB_FTYPE_TOMB].size, @@ -1022,15 +1142,15 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { }, }}; - for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tData); i++) { - tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tData->version, i)); - tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tData->version, i)); + for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) { + tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tombBlock->version, i)); + tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tombBlock->version, i)); } - for (int32_t i = 0; i < ARRAY_SIZE(writer->tData->dataArr); i++) { + for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) { int32_t size; - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]), - TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]), + TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); @@ -1046,7 +1166,7 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk); TSDB_CHECK_CODE(code, lino, _exit); - tTombBlockClear(writer->tData); + tTombBlockClear(writer->tombBlock); _exit: if (code) { @@ -1098,23 +1218,23 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom int32_t lino = 0; while (writer->ctx->hasOldTomb) { - for (; writer->ctx->iRowTomb < TOMB_BLOCK_SIZE(writer->ctx->tData); writer->ctx->iRowTomb++) { + for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) { STombRecord record1[1] = {{ - .suid = TARRAY2_GET(writer->ctx->tData->suid, writer->ctx->iRowTomb), - .uid = TARRAY2_GET(writer->ctx->tData->uid, writer->ctx->iRowTomb), - .version = TARRAY2_GET(writer->ctx->tData->version, writer->ctx->iRowTomb), - .skey = TARRAY2_GET(writer->ctx->tData->skey, writer->ctx->iRowTomb), - .ekey = TARRAY2_GET(writer->ctx->tData->ekey, writer->ctx->iRowTomb), + .suid = TARRAY2_GET(writer->ctx->tombBlock->suid, writer->ctx->tombBlockIdx), + .uid = TARRAY2_GET(writer->ctx->tombBlock->uid, writer->ctx->tombBlockIdx), + .version = TARRAY2_GET(writer->ctx->tombBlock->version, writer->ctx->tombBlockIdx), + .skey = TARRAY2_GET(writer->ctx->tombBlock->skey, writer->ctx->tombBlockIdx), + .ekey = TARRAY2_GET(writer->ctx->tombBlock->ekey, writer->ctx->tombBlockIdx), }}; int32_t c = tTombRecordCompare(record, record1); if (c < 0) { break; } else if (c > 0) { - code = tTombBlockPut(writer->tData, record1); + code = tTombBlockPut(writer->tombBlock, record1); TSDB_CHECK_CODE(code, lino, _exit); - if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) { + if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) { code = tsdbDataFileDoWriteTombBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1131,10 +1251,10 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom for (; writer->ctx->tombBlkArrayIdx < TARRAY2_SIZE(writer->ctx->tombBlkArray); ++writer->ctx->tombBlkArrayIdx) { const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx); - code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tData); + code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock); TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->iRowTomb = 0; + writer->ctx->tombBlockIdx = 0; writer->ctx->tombBlkArrayIdx++; break; } @@ -1143,10 +1263,10 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom _write: if (record->suid == INT64_MAX) goto _exit; - code = tTombBlockPut(writer->tData, record); + code = tTombBlockPut(writer->tombBlock, record); TSDB_CHECK_CODE(code, lino, _exit); - if (TOMB_BLOCK_SIZE(writer->tData) >= writer->config->maxRow) { + if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) { code = tsdbDataFileDoWriteTombBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1177,8 +1297,10 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr code = tsdbDataFileWriteTableDataBegin(writer, tbid); TSDB_CHECK_CODE(code, lino, _exit); +#if 0 code = tsdbDataFileWriteBlockIdx(writer); TSDB_CHECK_CODE(code, lino, _exit); +#endif code = tsdbDataFileWriteHeadFooter(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -1329,8 +1451,12 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { } if (writer->ctx->reader) { + code = tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray); + TSDB_CHECK_CODE(code, lino, _exit); +#if 0 code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray); TSDB_CHECK_CODE(code, lino, _exit); +#endif } _exit: @@ -1341,7 +1467,7 @@ _exit: } int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) { - writer[0] = taosMemoryCalloc(1, sizeof(SDataFileWriter)); + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; writer[0]->config[0] = config[0]; @@ -1430,8 +1556,8 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) TSDB_CHECK_CODE(code, lino, _exit); } - if (!writer->ctx->tbHasOldData // - && writer->bData->nRow == 0 // + if (!writer->ctx->tbHasOldData // + && writer->blockData->nRow == 0 // ) { code = tsdbDataFileWriteDataBlock(writer, bData); TSDB_CHECK_CODE(code, lino, _exit); @@ -1453,10 +1579,10 @@ _exit: int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer) { ASSERT(writer->ctx->opened); - if (writer->bData->nRow == 0) return 0; + if (writer->blockData->nRow == 0) return 0; if (writer->ctx->tbHasOldData) return 0; - return tsdbDataFileWriteDataBlock(writer, writer->bData); + return tsdbDataFileWriteDataBlock(writer, writer->blockData); } static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { @@ -1488,8 +1614,8 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { } writer->ctx->tombBlkArrayIdx = 0; - tTombBlockClear(writer->ctx->tData); - writer->ctx->iRowTomb = 0; + tTombBlockClear(writer->ctx->tombBlock); + writer->ctx->tombBlockIdx = 0; } _exit: -- GitLab