From cbd5da5dee9250c154afc7bd88ee6f0a13b5fcf7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Jun 2023 16:07:06 +0800 Subject: [PATCH] more code --- .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 270 +++++++++--------- 1 file changed, 138 insertions(+), 132 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index cbc06042f0..a41a916362 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -491,13 +491,6 @@ struct SDataFileWriter { int32_t tombBlkArrayIdx; STombBlock tombBlock[1]; int32_t tombBlockIdx; - -#if 0 - const TBlockIdxArray *blockIdxArray; - int32_t blockIdxArrayIdx; - const TDataBlkArray *dataBlkArray; - int32_t dataBlkArrayIdx; -#endif } ctx[1]; STFile files[TSDB_FTYPE_MAX]; @@ -512,36 +505,8 @@ struct SDataFileWriter { TTombBlkArray tombBlkArray[1]; STombBlock tombBlock[1]; - -#if 0 - TBlockIdxArray blockIdxArray[1]; - TDataBlkArray dataBlkArray[1]; -#endif }; -#if 0 -static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) { - int32_t code = 0; - int32_t lino = 0; - - writer->headFooter->blockIdxPtr->offset = writer->files[TSDB_FTYPE_HEAD].size; - writer->headFooter->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray); - - if (writer->headFooter->blockIdxPtr->size) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->headFooter->blockIdxPtr->offset, - (void *)TARRAY2_DATA(writer->blockIdxArray), writer->headFooter->blockIdxPtr->size); - TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_HEAD].size += writer->headFooter->blockIdxPtr->size; - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} -#endif - static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) { ASSERT(0); return 0; @@ -553,14 +518,14 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { } tTombBlockDestroy(writer->tombBlock); - // tStatisBlockDestroy(writer->statisBlock); - tBlockDataDestroy(writer->blockData); TARRAY2_DESTROY(writer->tombBlkArray, NULL); -#if 0 - TARRAY2_DESTROY(writer->dataBlkArray, NULL); - TARRAY2_DESTROY(writer->blockIdxArray, NULL); -#endif + tBlockDataDestroy(writer->blockData); + tBrinBlockDestroy(writer->brinBlock); + TARRAY2_DESTROY(writer->brinBlkArray, NULL); + tTombBlockDestroy(writer->ctx->tombBlock); + tBlockDataDestroy(writer->ctx->blockData); + tBrinBlockDestroy(writer->ctx->brinBlock); for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) { tFree(writer->bufArr[i]); @@ -894,11 +859,6 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += blockIdx->size; -#if 0 - code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); -#endif - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -947,101 +907,116 @@ _exit: return code; } -static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { +static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TSDBKEY *key) { + if (writer->ctx->tbHasOldData == false) return 0; + int32_t code = 0; int32_t lino = 0; - if (writer->ctx->tbHasOldData) { - TSDBKEY key[1]; - if (row->type == TSDBROW_ROW_FMT) { - key->ts = row->pTSRow->ts; - key->version = row->version; - } else { - key->ts = row->pBlockData->aTSKEY[row->iRow]; - key->version = row->pBlockData->aVersion[row->iRow]; - } - + for (;;) { for (;;) { - for (;;) { - // SBlockData - for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { - if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] // - || (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] && - key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) { - goto _do_write; - } else { - TSDBROW row1 = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); - code = tsdbDataFileDoWriteTSRow(writer, &row1); - TSDB_CHECK_CODE(code, lino, _exit); - } + // SBlockData + for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { + if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] // + || (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] && + key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) { + goto _exit; + } else { + TSDBROW row1 = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + code = tsdbDataFileDoWriteTSRow(writer, &row1); + TSDB_CHECK_CODE(code, lino, _exit); } + } + + // SBrinBlock + if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) { + break; + } - // SBrinBlock - if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) { - break; + for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { + if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) { + writer->ctx->tbHasOldData = false; + goto _exit; } - for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { - if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) { - writer->ctx->tbHasOldData = false; - goto _do_write; - } + if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) // + || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) && + key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) { + goto _exit; + } else { + SBrinRecord record[1]; + tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); + if (key->ts > TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) // + || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) && + key->version > TARRAY2_GET(writer->ctx->brinBlock->lastKeyVer, writer->ctx->brinBlockIdx))) { + if (writer->blockData->nRow > 0) { + code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) // - || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) && - key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) { - goto _do_write; + code = tsdbDataFileWriteBrinRecord(writer, record); + TSDB_CHECK_CODE(code, lino, _exit); } else { - SBrinRecord record[1]; - tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); - if (key->ts > TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) // - || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) && - key->version > TARRAY2_GET(writer->ctx->brinBlock->lastKeyVer, writer->ctx->brinBlockIdx))) { - if (writer->blockData->nRow > 0) { - code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDataFileWriteBrinRecord(writer, record); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData); + TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->blockDataIdx = 0; - writer->ctx->brinBlockIdx++; - break; - } + writer->ctx->blockDataIdx = 0; + writer->ctx->brinBlockIdx++; + break; } } } + } + + // SBrinBlk + if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { + writer->ctx->brinBlkArray = NULL; + writer->ctx->tbHasOldData = false; + goto _exit; + } + + for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { + const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); - // SBrinBlk - if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { - writer->ctx->brinBlkArray = NULL; + if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) { writer->ctx->tbHasOldData = false; - goto _do_write; + goto _exit; } - for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { - const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); + code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); + TSDB_CHECK_CODE(code, lino, _exit); - if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) { - writer->ctx->tbHasOldData = false; - goto _do_write; - } + writer->ctx->brinBlockIdx = 0; + writer->ctx->brinBlkArrayIdx++; + break; + } + } - code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); - TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} - writer->ctx->brinBlockIdx = 0; - writer->ctx->brinBlkArrayIdx++; - break; - } +static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { + int32_t code = 0; + int32_t lino = 0; + + if (writer->ctx->tbHasOldData) { + TSDBKEY key[1]; + if (row->type == TSDBROW_ROW_FMT) { + key->ts = row->pTSRow->ts; + key->version = row->version; + } else { + key->ts = row->pBlockData->aTSKEY[row->iRow]; + key->version = row->pBlockData->aVersion[row->iRow]; } + + code = tsdbDataFileDoWriteTableOldData(writer, key); + TSDB_CHECK_CODE(code, lino, _exit); } -_do_write: code = tsdbDataFileDoWriteTSRow(writer, row); TSDB_CHECK_CODE(code, lino, _exit); @@ -1059,11 +1034,15 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { int32_t lino = 0; if (writer->ctx->tbHasOldData) { - for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { - TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); - code = tsdbDataFileDoWriteTSRow(writer, &row); - TSDB_CHECK_CODE(code, lino, _exit); - } + TSDBKEY key = { + .ts = TSKEY_MAX, + .version = VERSION_MAX, + }; + + code = tsdbDataFileDoWriteTableOldData(writer, &key); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(writer->ctx->tbHasOldData == false); } code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); @@ -1152,10 +1131,11 @@ static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, - (const uint8_t *)writer->headFooter, sizeof(SHeadFooter)); + int32_t ftype = TSDB_FTYPE_HEAD; + code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter, + sizeof(SHeadFooter)); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_HEAD].size += sizeof(SHeadFooter); + writer->files[ftype].size += sizeof(SHeadFooter); _exit: if (code) { @@ -1327,6 +1307,28 @@ _exit: return code; } +static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) { + ASSERT(TARRAY2_SIZE(writer->brinBlkArray) > 0); + + int32_t code = 0; + int32_t lino = 0; + + int32_t ftype = TSDB_FTYPE_HEAD; + writer->headFooter->brinBlkPtr->offset = writer->files[ftype].size; + writer->headFooter->brinBlkPtr->size = TARRAY2_DATA_LEN(writer->brinBlkArray); + + code = tsdbWriteFile(writer->fd[ftype], writer->headFooter->brinBlkPtr->offset, + (uint8_t *)TARRAY2_DATA(writer->brinBlkArray), writer->headFooter->brinBlkPtr->size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->files[ftype].size += writer->headFooter->brinBlkPtr->size; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) { int32_t code = 0; int32_t lino = 0; @@ -1346,10 +1348,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr code = tsdbDataFileWriteTableDataBegin(writer, tbid); TSDB_CHECK_CODE(code, lino, _exit); -#if 0 - code = tsdbDataFileWriteBlockIdx(writer); + code = tsdbDataFileWriteBrinBlk(writer); TSDB_CHECK_CODE(code, lino, _exit); -#endif code = tsdbDataFileWriteHeadFooter(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -1502,10 +1502,6 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { if (writer->ctx->reader) { code = tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray); TSDB_CHECK_CODE(code, lino, _exit); -#if 0 - code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray); - TSDB_CHECK_CODE(code, lino, _exit); -#endif } _exit: @@ -1605,6 +1601,16 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { TSDB_CHECK_CODE(code, lino, _exit); } + if (writer->ctx->tbHasOldData) { + TSDBKEY key = { + .ts = bData->aTSKEY[0], + .version = bData->aVersion[0], + }; + + code = tsdbDataFileDoWriteTableOldData(writer, &key); + TSDB_CHECK_CODE(code, lino, _exit); + } + if (!writer->ctx->tbHasOldData // && writer->blockData->nRow == 0 // ) { -- GitLab