/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "inc/tsdbDataFileRW.h" typedef struct { SFDataPtr brinBlkPtr[1]; #if 1 SFDataPtr blockIdxPtr[1]; #endif SFDataPtr rsrvd[2]; } SHeadFooter; typedef struct { SFDataPtr tombBlkPtr[1]; SFDataPtr rsrvd[2]; } STombFooter; // SDataFileReader ============================================= struct SDataFileReader { SDataFileReaderConfig config[1]; uint8_t *bufArr[5]; struct { bool headFooterLoaded; bool tombFooterLoaded; bool brinBlkLoaded; bool tombBlkLoaded; #if 1 TABLEID tbid[1]; bool blockIdxLoaded; #endif } ctx[1]; STsdbFD *fd[TSDB_FTYPE_MAX]; SHeadFooter headFooter[1]; STombFooter tombFooter[1]; TBrinBlkArray brinBlkArray[1]; TTombBlkArray tombBlkArray[1]; #if 1 TDataBlkArray dataBlkArray[1]; TBlockIdxArray blockIdxArray[1]; #endif }; static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) { if (reader->ctx->headFooterLoaded) return 0; int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_HEAD; if (reader->fd[ftype]) { code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter), (uint8_t *)reader->headFooter, sizeof(SHeadFooter)); TSDB_CHECK_CODE(code, lino, _exit); } reader->ctx->headFooterLoaded = true; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) { if (reader->ctx->tombFooterLoaded) return 0; int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_TOMB; if (reader->fd[ftype]) { code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter), (uint8_t *)reader->tombFooter, sizeof(STombFooter)); TSDB_CHECK_CODE(code, lino, _exit); } reader->ctx->tombFooterLoaded = true; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) { int32_t code = 0; int32_t lino = 0; reader[0] = taosMemoryCalloc(1, sizeof(**reader)); if (reader[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } reader[0]->config[0] = config[0]; if (reader[0]->config->bufArr == NULL) { reader[0]->config->bufArr = reader[0]->bufArr; } if (fname) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (fname[i]) { code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } } else { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (config->files[i].exist) { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, &config->files[i].file, fname1); code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReaderClose(SDataFileReader **reader) { if (reader[0] == NULL) return 0; TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL); #if 1 TARRAY2_DESTROY(reader[0]->dataBlkArray, NULL); TARRAY2_DESTROY(reader[0]->blockIdxArray, NULL); #endif for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (reader[0]->fd[i]) { tsdbCloseFile(&reader[0]->fd[i]); } } for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) { tFree(reader[0]->bufArr[i]); } taosMemoryFree(reader[0]); reader[0] = NULL; return 0; } int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **brinBlkArray) { int32_t code = 0; int32_t lino = 0; if (!reader->ctx->brinBlkLoaded) { code = tsdbDataFileReadHeadFooter(reader); TSDB_CHECK_CODE(code, lino, _exit); if (reader->headFooter->brinBlkPtr->size > 0) { void *data = taosMemoryMalloc(reader->headFooter->brinBlkPtr->size); if (data == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data, reader->headFooter->brinBlkPtr->size); if (code) { taosMemoryFree(data); TSDB_CHECK_CODE(code, lino, _exit); } int32_t size = reader->headFooter->brinBlkPtr->size / sizeof(SBrinBlk); TARRAY2_INIT_EX(reader->brinBlkArray, size, size, data); } else { TARRAY2_INIT(reader->brinBlkArray); } reader->ctx->brinBlkLoaded = true; } brinBlkArray[0] = reader->brinBlkArray; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock) { int32_t code = 0; int32_t lino = 0; code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); int32_t size = 0; tBrinBlockClear(brinBlock); for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[i], TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &reader->config->bufArr[1], brinBlk->numRec * sizeof(int64_t), &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr1[i], reader->config->bufArr[1], brinBlk->numRec); TSDB_CHECK_CODE(code, lino, _exit); size += brinBlk->size[i]; } for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) { code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[j], TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &reader->config->bufArr[1], brinBlk->numRec * sizeof(int32_t), &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr2[i], reader->config->bufArr[1], brinBlk->numRec); TSDB_CHECK_CODE(code, lino, _exit); size += brinBlk->size[j]; } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; code = tRealloc(&reader->config->bufArr[0], record->blockSize); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize); TSDB_CHECK_CODE(code, lino, _exit); code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, STSchema *pTSchema, int16_t cids[], int32_t ncid) { int32_t code = 0; int32_t lino = 0; code = tBlockDataInit(bData, (TABLEID *)record, pTSchema, cids, ncid); TSDB_CHECK_CODE(code, lino, _exit); // uid + version + tskey code = tRealloc(&reader->config->bufArr[0], record->blockKeySize); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize); TSDB_CHECK_CODE(code, lino, _exit); // hdr SDiskDataHdr hdr[1]; int32_t size = 0; size += tGetDiskDataHdr(reader->config->bufArr[0] + size, hdr); ASSERT(hdr->delimiter == TSDB_FILE_DLMT); ASSERT(record->uid == hdr->uid); bData->nRow = hdr->nRow; // uid ASSERT(hdr->uid); // version code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg, (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); size += hdr->szVer; // ts code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg, (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); size += hdr->szKey; ASSERT(size == record->blockKeySize); // other columns if (bData->nColData > 0) { if (hdr->szBlkCol > 0) { code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, reader->config->bufArr[0], hdr->szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); } SBlockCol bc[1] = {{.cid = 0}}; SBlockCol *blockCol = bc; size = 0; for (int32_t i = 0; i < bData->nColData; i++) { SColData *colData = tBlockDataGetColDataByIdx(bData, i); while (blockCol && blockCol->cid < colData->cid) { if (size < hdr->szBlkCol) { size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol); } else { ASSERT(size == hdr->szBlkCol); blockCol = NULL; } } if (blockCol == NULL || blockCol->cid > colData->cid) { for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) { code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type)); TSDB_CHECK_CODE(code, lino, _exit); } } else { ASSERT(blockCol->type == colData->type); ASSERT(blockCol->flag && blockCol->flag != HAS_NONE); if (blockCol->flag == HAS_NULL) { for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) { code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type)); TSDB_CHECK_CODE(code, lino, _exit); } } else { int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; code = tRealloc(&reader->config->bufArr[1], size1); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset, reader->config->bufArr[1], size1); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData, &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); } } } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record, TColumnDataAggArray *columnDataAggArray) { int32_t code = 0; int32_t lino = 0; TARRAY2_CLEAR(columnDataAggArray, NULL); if (record->smaSize > 0) { code = tRealloc(&reader->config->bufArr[0], record->smaSize); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize); TSDB_CHECK_CODE(code, lino, _exit); // decode sma data int32_t size = 0; while (size < record->smaSize) { SColumnDataAgg sma[1]; size += tGetColumnDataAgg(reader->config->bufArr[0] + size, sma); code = TARRAY2_APPEND_PTR(columnDataAggArray, sma); TSDB_CHECK_CODE(code, lino, _exit); } ASSERT(size == record->smaSize); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } #if 0 int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray) { int32_t code = 0; int32_t lino = 0; if (!reader->ctx->blockIdxLoaded) { code = tsdbDataFileReadHeadFooter(reader); TSDB_CHECK_CODE(code, lino, _exit); TARRAY2_CLEAR(reader->blockIdxArray, NULL); if (reader->fd[TSDB_FTYPE_HEAD] // && reader->headFooter->blockIdxPtr->size) { code = tRealloc(&reader->config->bufArr[0], reader->headFooter->blockIdxPtr->size); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->blockIdxPtr->offset, reader->config->bufArr[0], reader->headFooter->blockIdxPtr->size); TSDB_CHECK_CODE(code, lino, _exit); int32_t size = reader->headFooter->blockIdxPtr->size / sizeof(SBlockIdx); for (int32_t i = 0; i < size; ++i) { code = TARRAY2_APPEND_PTR(reader->blockIdxArray, ((SBlockIdx *)reader->config->bufArr[0]) + i); TSDB_CHECK_CODE(code, lino, _exit); } } reader->ctx->blockIdxLoaded = true; } blockIdxArray[0] = reader->blockIdxArray; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray) { ASSERT(reader->ctx->headFooterLoaded); if (reader->ctx->tbid->suid == blockIdx->suid && reader->ctx->tbid->uid == blockIdx->uid) { dataBlkArray[0] = reader->dataBlkArray; return 0; } int32_t code = 0; int32_t lino = 0; reader->ctx->tbid->suid = blockIdx->suid; reader->ctx->tbid->uid = blockIdx->uid; TARRAY2_CLEAR(reader->dataBlkArray, NULL); code = tRealloc(&reader->config->bufArr[0], blockIdx->size); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], blockIdx->offset, reader->config->bufArr[0], blockIdx->size); TSDB_CHECK_CODE(code, lino, _exit); int32_t size = blockIdx->size / sizeof(SDataBlk); for (int32_t i = 0; i < size; ++i) { code = TARRAY2_APPEND_PTR(reader->dataBlkArray, ((SDataBlk *)reader->config->bufArr[0]) + i); TSDB_CHECK_CODE(code, lino, _exit); } dataBlkArray[0] = reader->dataBlkArray; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; code = tRealloc(&reader->config->bufArr[0], dataBlk->aSubBlock->szBlock); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], dataBlk->aSubBlock->offset, reader->config->bufArr[0], dataBlk->aSubBlock->szBlock); TSDB_CHECK_CODE(code, lino, _exit); code = tDecmprBlockData(reader->config->bufArr[0], dataBlk->aSubBlock->szBlock, bData, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } #endif int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) { int32_t code = 0; int32_t lino = 0; if (!reader->ctx->tombBlkLoaded) { code = tsdbDataFileReadTombFooter(reader); TSDB_CHECK_CODE(code, lino, _exit); if (reader->tombFooter->tombBlkPtr->size > 0) { void *data = taosMemoryMalloc(reader->tombFooter->tombBlkPtr->size); if (data == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data, reader->tombFooter->tombBlkPtr->size); if (code) { taosMemoryFree(data); TSDB_CHECK_CODE(code, lino, _exit); } int32_t size = reader->tombFooter->tombBlkPtr->size / sizeof(STombBlk); TARRAY2_INIT_EX(reader->tombBlkArray, size, size, data); } else { TARRAY2_INIT(reader->tombBlkArray); } reader->ctx->tombBlkLoaded = true; } tombBlkArray[0] = reader->tombBlkArray; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) { int32_t code = 0; int32_t lino = 0; code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); int32_t size = 0; tTombBlockClear(tData); for (int32_t i = 0; i < ARRAY_SIZE(tData->dataArr); ++i) { code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND_BATCH(&tData->dataArr[i], reader->config->bufArr[1], tombBlk->numRec); TSDB_CHECK_CODE(code, lino, _exit); size += tombBlk->size[i]; } ASSERT(size == tombBlk->dp->size); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); } return code; } // SDataFileWriter ============================================= struct SDataFileWriter { SDataFileWriterConfig config[1]; SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; uint8_t *bufArr[5]; struct { bool opened; SDataFileReader *reader; // for ts data TABLEID tbid[1]; bool tbHasOldData; const TBrinBlkArray *brinBlkArray; int32_t brinBlkArrayIdx; SBrinBlock brinBlock[1]; int32_t brinBlockIdx; SBlockData blockData[1]; int32_t blockDataIdx; // for tomb data bool hasOldTomb; const TTombBlkArray *tombBlkArray; int32_t tombBlkArrayIdx; STombBlock tombBlock[1]; int32_t tombBlockIdx; } ctx[1]; STFile files[TSDB_FTYPE_MAX]; STsdbFD *fd[TSDB_FTYPE_MAX]; SHeadFooter headFooter[1]; STombFooter tombFooter[1]; TBrinBlkArray brinBlkArray[1]; SBrinBlock brinBlock[1]; SBlockData blockData[1]; TTombBlkArray tombBlkArray[1]; STombBlock tombBlock[1]; }; static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) { ASSERT(0); return 0; } static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { if (writer->ctx->reader) { tsdbDataFileReaderClose(&writer->ctx->reader); } tTombBlockDestroy(writer->tombBlock); TARRAY2_DESTROY(writer->tombBlkArray, NULL); tBlockDataDestroy(writer->blockData); tBrinBlockDestroy(writer->brinBlock); TARRAY2_DESTROY(writer->brinBlkArray, NULL); tTombBlockDestroy(writer->ctx->tombBlock); tBlockDataDestroy(writer->ctx->blockData); tBrinBlockDestroy(writer->ctx->brinBlock); for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) { tFree(writer->bufArr[i]); } tDestroyTSchema(writer->skmRow->pTSchema); tDestroyTSchema(writer->skmTb->pTSchema); return 0; } static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (writer->config->files[i].exist) { SDataFileReaderConfig config[1] = {{ .tsdb = writer->config->tsdb, .szPage = writer->config->szPage, }}; for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { config->files[i].exist = writer->config->files[i].exist; if (config->files[i].exist) { config->files[i].file = writer->config->files[i].file; } } code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader); TSDB_CHECK_CODE(code, lino, _exit); break; } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; int32_t ftype; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr; // open reader code = tsdbDataFileWriterDoOpenReader(writer); TSDB_CHECK_CODE(code, lino, _exit); // .head ftype = TSDB_FTYPE_HEAD; writer->files[ftype] = (STFile){ .type = ftype, .did = writer->config->did, .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, }; // .data ftype = TSDB_FTYPE_DATA; if (writer->config->files[ftype].exist) { writer->files[ftype] = writer->config->files[ftype].file; } else { writer->files[ftype] = (STFile){ .type = ftype, .did = writer->config->did, .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, }; } // .sma ftype = TSDB_FTYPE_SMA; if (writer->config->files[ftype].exist) { writer->files[ftype] = writer->config->files[ftype].file; } else { writer->files[ftype] = (STFile){ .type = ftype, .did = writer->config->did, .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, }; } // .tomb ftype = TSDB_FTYPE_TOMB; writer->files[ftype] = (STFile){ .type = ftype, .did = writer->config->did, .fid = writer->config->fid, .cid = writer->config->cid, .size = 0, }; writer->ctx->opened = true; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; int32_t code = 0; int32_t lino = 0; // get SBrinBlk SBrinBlk brinBlk[1] = { { .dp[0] = { .offset = writer->files[TSDB_FTYPE_HEAD].size, .size = 0, }, .minTbid = { .suid = TARRAY2_FIRST(writer->brinBlock->suid), .uid = TARRAY2_FIRST(writer->brinBlock->uid), }, .maxTbid = { .suid = TARRAY2_LAST(writer->brinBlock->suid), .uid = TARRAY2_LAST(writer->brinBlock->uid), }, .minVer = TARRAY2_FIRST(writer->brinBlock->minVer), .maxVer = TARRAY2_FIRST(writer->brinBlock->minVer), .numRec = BRIN_BLOCK_SIZE(writer->brinBlock), .cmprAlg = writer->config->cmprAlg, }, }; for (int32_t i = 1; i < BRIN_BLOCK_SIZE(writer->brinBlock); i++) { if (brinBlk->minVer > TARRAY2_GET(writer->brinBlock->minVer, i)) { brinBlk->minVer = TARRAY2_GET(writer->brinBlock->minVer, i); } if (brinBlk->maxVer < TARRAY2_GET(writer->brinBlock->maxVer, i)) { brinBlk->maxVer = TARRAY2_GET(writer->brinBlock->maxVer, i); } } // write to file for (int32_t i = 0; i < ARRAY_SIZE(writer->brinBlock->dataArr1); i++) { code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(writer->brinBlock->dataArr1 + i), TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &writer->config->bufArr[0], 0, &brinBlk->size[i], &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], brinBlk->size[i]); TSDB_CHECK_CODE(code, lino, _exit); brinBlk->dp[i].size += brinBlk->size[i]; writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i]; } for (int32_t i = 0, j = ARRAY_SIZE(writer->brinBlock->dataArr1); i < ARRAY_SIZE(writer->brinBlock->dataArr2); i++, j++) { code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &writer->config->bufArr[0], 0, &brinBlk->size[j], &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], brinBlk->size[j]); TSDB_CHECK_CODE(code, lino, _exit); brinBlk->dp[i].size += brinBlk->size[j]; writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j]; } // append to brinBlkArray code = TARRAY2_APPEND_PTR(writer->brinBlkArray, brinBlk); TSDB_CHECK_CODE(code, lino, _exit); tBrinBlockClear(writer->brinBlock); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinRecord *record) { int32_t code = 0; int32_t lino = 0; code = tBrinBlockPut(writer->brinBlock, record); TSDB_CHECK_CODE(code, lino, _exit); if (BRIN_BLOCK_SIZE(writer->brinBlock) >= writer->config->maxRow) { code = tsdbDataFileWriteBrinBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { if (bData->nRow == 0) return 0; ASSERT(bData->uid); int32_t code = 0; int32_t lino = 0; SBrinRecord record[1] = {{ .suid = bData->suid, .uid = bData->uid, .firstKey = bData->aTSKEY[0], .firstKeyVer = bData->aVersion[0], .lastKey = bData->aTSKEY[bData->nRow - 1], .lastKeyVer = bData->aVersion[bData->nRow - 1], .minVer = bData->aVersion[0], .maxVer = bData->aVersion[0], .blockOffset = writer->files[TSDB_FTYPE_DATA].size, .smaOffset = writer->files[TSDB_FTYPE_SMA].size, .blockSize = 0, .blockKeySize = 0, .smaSize = 0, .numRow = bData->nRow, .count = 1, }}; for (int32_t i = 1; i < bData->nRow; ++i) { if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) { record->count++; } if (bData->aVersion[i] < record->minVer) { record->minVer = bData->aVersion[i]; } if (bData->aVersion[i] > record->maxVer) { record->maxVer = bData->aVersion[i]; } } // to .data file int32_t sizeArr[5] = {0}; code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); TSDB_CHECK_CODE(code, lino, _exit); record->blockKeySize = sizeArr[3] + sizeArr[2]; record->blockSize = sizeArr[0] + sizeArr[1] + record->blockKeySize; for (int32_t i = 3; i >= 0; --i) { if (sizeArr[i]) { code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], sizeArr[i]); TSDB_CHECK_CODE(code, lino, _exit); writer->files[TSDB_FTYPE_DATA].size += sizeArr[i]; } } // to .sma file for (int32_t i = 0; i < bData->nColData; ++i) { SColData *colData = bData->aColData + i; if ((!colData->smaOn) || ((colData->flag & HAS_VALUE) == 0)) continue; SColumnDataAgg sma[1] = {{.colId = colData->cid}}; tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); int32_t size = tPutColumnDataAgg(NULL, sma); code = tRealloc(&writer->config->bufArr[0], record->smaSize + size); TSDB_CHECK_CODE(code, lino, _exit); tPutColumnDataAgg(writer->config->bufArr[0] + record->smaSize, sma); record->smaSize += size; } if (record->smaSize > 0) { code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->config->bufArr[0], record->smaSize); TSDB_CHECK_CODE(code, lino, _exit); writer->files[TSDB_FTYPE_SMA].size += record->smaSize; } // append SBrinRecord code = tsdbDataFileWriteBrinRecord(writer, record); TSDB_CHECK_CODE(code, lino, _exit); tBlockDataClear(bData); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkArray *dataBlkArray) { if (TARRAY2_SIZE(dataBlkArray) == 0) return 0; int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_HEAD; SBlockIdx blockIdx[1] = {{ .suid = writer->ctx->tbid->suid, .uid = writer->ctx->tbid->uid, .offset = writer->files[ftype].size, .size = TARRAY2_DATA_LEN(dataBlkArray), }}; code = tsdbWriteFile(writer->fd[ftype], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray), blockIdx->size); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += blockIdx->size; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { int32_t code = 0; int32_t lino = 0; // update/append if (row->type == TSDBROW_ROW_FMT) { code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow); TSDB_CHECK_CODE(code, lino, _exit); } TSDBKEY key[1]; if (row->type == TSDBROW_ROW_FMT) { key->ts = row->pTSRow->ts; key->version = row->version; } else { key->ts = row->pBlockData->aTSKEY[row->iRow]; key->version = row->pBlockData->aVersion[row->iRow]; } if (key->version <= writer->config->compactVersion // && writer->blockData->nRow > 0 // && writer->blockData->aTSKEY[writer->blockData->nRow - 1] == key->ts // ) { code = tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema); TSDB_CHECK_CODE(code, lino, _exit); } else { if (writer->blockData->nRow >= writer->config->maxRow) { code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TSDBKEY *key) { if (writer->ctx->tbHasOldData == false) return 0; int32_t code = 0; int32_t lino = 0; for (;;) { for (;;) { // SBlockData for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] // || (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] && key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) { goto _exit; } else { TSDBROW row1 = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); code = tsdbDataFileDoWriteTSRow(writer, &row1); TSDB_CHECK_CODE(code, lino, _exit); } } // SBrinBlock if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) { break; } for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) { writer->ctx->tbHasOldData = false; goto _exit; } if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) // || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) && key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) { goto _exit; } else { SBrinRecord record[1]; tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); if (key->ts > TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) // || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) && key->version > TARRAY2_GET(writer->ctx->brinBlock->lastKeyVer, writer->ctx->brinBlockIdx))) { if (writer->blockData->nRow > 0) { code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbDataFileWriteBrinRecord(writer, record); TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData); TSDB_CHECK_CODE(code, lino, _exit); writer->ctx->blockDataIdx = 0; writer->ctx->brinBlockIdx++; break; } } } } // SBrinBlk if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { writer->ctx->brinBlkArray = NULL; writer->ctx->tbHasOldData = false; goto _exit; } for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) { writer->ctx->tbHasOldData = false; goto _exit; } code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); TSDB_CHECK_CODE(code, lino, _exit); writer->ctx->brinBlockIdx = 0; writer->ctx->brinBlkArrayIdx++; break; } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { int32_t code = 0; int32_t lino = 0; if (writer->ctx->tbHasOldData) { TSDBKEY key[1]; if (row->type == TSDBROW_ROW_FMT) { key->ts = row->pTSRow->ts; key->version = row->version; } else { key->ts = row->pBlockData->aTSKEY[row->iRow]; key->version = row->pBlockData->aVersion[row->iRow]; } code = tsdbDataFileDoWriteTableOldData(writer, key); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbDataFileDoWriteTSRow(writer, row); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { if (writer->ctx->tbid->uid == 0) return 0; int32_t code = 0; int32_t lino = 0; if (writer->ctx->tbHasOldData) { TSDBKEY key = { .ts = TSKEY_MAX, .version = VERSION_MAX, }; code = tsdbDataFileDoWriteTableOldData(writer, &key); TSDB_CHECK_CODE(code, lino, _exit); ASSERT(writer->ctx->tbHasOldData == false); } code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) { int32_t code = 0; int32_t lino = 0; SMetaInfo info; ASSERT(writer->ctx->blockDataIdx == writer->ctx->blockData->nRow); ASSERT(writer->blockData->nRow == 0); writer->ctx->tbHasOldData = false; while (writer->ctx->brinBlkArray) { // skip data of previous table for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { // skip removed table int64_t uid = TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx); if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { for (int32_t idx = writer->ctx->brinBlockIdx + 1; // idx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock) // && uid == TARRAY2_GET(writer->ctx->brinBlock->uid, idx); idx++, writer->ctx->brinBlockIdx++) { } continue; } SBrinRecord record[1]; tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); int32_t c = tTABLEIDCmprFn(record, tbid); if (c < 0) { code = tsdbDataFileWriteBrinRecord(writer, record); TSDB_CHECK_CODE(code, lino, _exit); } else { if (c == 0) { writer->ctx->tbHasOldData = true; } goto _begin; } } if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { writer->ctx->brinBlkArray = NULL; break; } for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); TSDB_CHECK_CODE(code, lino, _exit); writer->ctx->brinBlockIdx = 0; writer->ctx->brinBlkArrayIdx++; break; } } _begin: writer->ctx->tbid[0] = *tbid; if (tbid->uid == INT64_MAX) goto _exit; code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb); TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(writer->blockData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_HEAD; code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter, sizeof(SHeadFooter)); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += sizeof(SHeadFooter); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0; int32_t code = 0; int32_t lino = 0; STombBlk tombBlk[1] = {{ .numRec = TOMB_BLOCK_SIZE(writer->tombBlock), .minTbid = { .suid = TARRAY2_FIRST(writer->tombBlock->suid), .uid = TARRAY2_FIRST(writer->tombBlock->uid), }, .maxTbid = { .suid = TARRAY2_LAST(writer->tombBlock->suid), .uid = TARRAY2_LAST(writer->tombBlock->uid), }, .minVer = TARRAY2_FIRST(writer->tombBlock->version), .maxVer = TARRAY2_FIRST(writer->tombBlock->version), .dp[0] = { .offset = writer->files[TSDB_FTYPE_TOMB].size, .size = 0, }, }}; for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) { tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tombBlock->version, i)); tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tombBlock->version, i)); } for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) { int32_t size; code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, writer->config->bufArr[0], size); TSDB_CHECK_CODE(code, lino, _exit); tombBlk->size[i] = size; tombBlk->dp[0].size += size; writer->files[TSDB_FTYPE_TOMB].size += size; } code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk); TSDB_CHECK_CODE(code, lino, _exit); tTombBlockClear(writer->tombBlock); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_TOMB; writer->tombFooter->tombBlkPtr->offset = writer->files[ftype].size; writer->tombFooter->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray); if (writer->tombFooter->tombBlkPtr->size) { code = tsdbWriteFile(writer->fd[ftype], writer->tombFooter->tombBlkPtr->offset, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->tombFooter->tombBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += writer->tombFooter->tombBlkPtr->size; } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, (const uint8_t *)writer->tombFooter, sizeof(STombFooter)); TSDB_CHECK_CODE(code, lino, _exit); writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) { int32_t code = 0; int32_t lino = 0; while (writer->ctx->hasOldTomb) { for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) { STombRecord record1[1] = {{ .suid = TARRAY2_GET(writer->ctx->tombBlock->suid, writer->ctx->tombBlockIdx), .uid = TARRAY2_GET(writer->ctx->tombBlock->uid, writer->ctx->tombBlockIdx), .version = TARRAY2_GET(writer->ctx->tombBlock->version, writer->ctx->tombBlockIdx), .skey = TARRAY2_GET(writer->ctx->tombBlock->skey, writer->ctx->tombBlockIdx), .ekey = TARRAY2_GET(writer->ctx->tombBlock->ekey, writer->ctx->tombBlockIdx), }}; int32_t c = tTombRecordCompare(record, record1); if (c < 0) { break; } else if (c > 0) { code = tTombBlockPut(writer->tombBlock, record1); TSDB_CHECK_CODE(code, lino, _exit); if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) { code = tsdbDataFileDoWriteTombBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } } else { ASSERT(0); } } if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) { writer->ctx->hasOldTomb = false; break; } for (; writer->ctx->tombBlkArrayIdx < TARRAY2_SIZE(writer->ctx->tombBlkArray); ++writer->ctx->tombBlkArrayIdx) { const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx); code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock); TSDB_CHECK_CODE(code, lino, _exit); writer->ctx->tombBlockIdx = 0; writer->ctx->tombBlkArrayIdx++; break; } } _write: if (record->suid == INT64_MAX) goto _exit; code = tTombBlockPut(writer->tombBlock, record); TSDB_CHECK_CODE(code, lino, _exit); if (TOMB_BLOCK_SIZE(writer->tombBlock) >= writer->config->maxRow) { code = tsdbDataFileDoWriteTombBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) { ASSERT(TARRAY2_SIZE(writer->brinBlkArray) > 0); int32_t code = 0; int32_t lino = 0; int32_t ftype = TSDB_FTYPE_HEAD; writer->headFooter->brinBlkPtr->offset = writer->files[ftype].size; writer->headFooter->brinBlkPtr->size = TARRAY2_DATA_LEN(writer->brinBlkArray); code = tsdbWriteFile(writer->fd[ftype], writer->headFooter->brinBlkPtr->offset, (uint8_t *)TARRAY2_DATA(writer->brinBlkArray), writer->headFooter->brinBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += writer->headFooter->brinBlkPtr->size; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) { int32_t code = 0; int32_t lino = 0; int32_t ftype; STFileOp op; if (writer->fd[TSDB_FTYPE_HEAD]) { TABLEID tbid[1] = {{ .suid = INT64_MAX, .uid = INT64_MAX, }}; code = tsdbDataFileWriteTableDataEnd(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteTableDataBegin(writer, tbid); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteBrinBlk(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteHeadFooter(writer); TSDB_CHECK_CODE(code, lino, _exit); // .head ftype = TSDB_FTYPE_HEAD; if (writer->config->files[ftype].exist) { op = (STFileOp){ .optype = TSDB_FOP_REMOVE, .fid = writer->config->fid, .of = writer->config->files[ftype].file, }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } op = (STFileOp){ .optype = TSDB_FOP_CREATE, .fid = writer->config->fid, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); // .data ftype = TSDB_FTYPE_DATA; if (!writer->config->files[ftype].exist) { op = (STFileOp){ .optype = TSDB_FOP_CREATE, .fid = writer->config->fid, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { op = (STFileOp){ .optype = TSDB_FOP_MODIFY, .fid = writer->config->fid, .of = writer->config->files[ftype].file, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } // .sma ftype = TSDB_FTYPE_SMA; if (!writer->config->files[ftype].exist) { op = (STFileOp){ .optype = TSDB_FOP_CREATE, .fid = writer->config->fid, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) { op = (STFileOp){ .optype = TSDB_FOP_MODIFY, .fid = writer->config->fid, .of = writer->config->files[ftype].file, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } } if (writer->fd[TSDB_FTYPE_TOMB]) { STombRecord record[1] = {{ .suid = INT64_MAX, .uid = INT64_MAX, .version = INT64_MAX, }}; code = tsdbDataFileDoWriteTombRecord(writer, record); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileDoWriteTombBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileDoWriteTombBlk(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteTombFooter(writer); TSDB_CHECK_CODE(code, lino, _exit); ftype = TSDB_FTYPE_SMA; if (writer->config->files[ftype].exist) { op = (STFileOp){ .optype = TSDB_FOP_REMOVE, .fid = writer->config->fid, .of = writer->config->files[ftype].file, }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } op = (STFileOp){ .optype = TSDB_FOP_CREATE, .fid = writer->config->fid, .nf = writer->files[ftype], }; code = TARRAY2_APPEND(opArr, op); TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (writer->fd[i]) { code = tsdbFsyncFile(writer->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); tsdbCloseFile(&writer->fd[i]); } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; int32_t ftypes[] = {TSDB_FTYPE_HEAD, TSDB_FTYPE_DATA, TSDB_FTYPE_SMA}; for (int32_t i = 0; i < ARRAY_SIZE(ftypes); ++i) { int32_t ftype = ftypes[i]; char fname[TSDB_FILENAME_LEN]; int32_t flag = TD_FILE_READ | TD_FILE_WRITE; if (writer->files[ftype].size == 0) { flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); } tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname); code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); if (writer->files[ftype].size == 0) { uint8_t hdr[TSDB_FHDR_SIZE] = {0}; code = tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += TSDB_FHDR_SIZE; } } if (writer->ctx->reader) { code = tsdbDataFileReadBrinBlk(writer->ctx->reader, &writer->ctx->brinBlkArray); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) { writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; writer[0]->config[0] = config[0]; return 0; } int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) { int32_t code = 0; int32_t lino = 0; if (writer[0]->ctx->opened) { if (abort) { code = tsdbDataFileWriterCloseAbort(writer[0]); TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbDataFileWriterCloseCommit(writer[0], opArr); TSDB_CHECK_CODE(code, lino, _exit); } tsdbDataFileWriterDoClose(writer[0]); } taosMemoryFree(writer[0]); writer[0] = NULL; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; if (!writer->ctx->opened) { code = tsdbDataFileWriterDoOpen(writer); TSDB_CHECK_CODE(code, lino, _exit); } if (writer->fd[TSDB_FTYPE_HEAD] == NULL) { code = tsdbDataFileWriterOpenDataFD(writer); TSDB_CHECK_CODE(code, lino, _exit); } if (row->uid != writer->ctx->tbid->uid) { code = tsdbDataFileWriteTableDataEnd(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbDataFileDoWriteTSData(writer, &row->row); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { if (bData->nRow == 0) return 0; int32_t code = 0; int32_t lino = 0; ASSERT(bData->uid); if (!writer->ctx->opened) { code = tsdbDataFileWriterDoOpen(writer); TSDB_CHECK_CODE(code, lino, _exit); } if (writer->fd[TSDB_FTYPE_DATA] == NULL) { code = tsdbDataFileWriterOpenDataFD(writer); TSDB_CHECK_CODE(code, lino, _exit); } if (bData->uid != writer->ctx->tbid->uid) { code = tsdbDataFileWriteTableDataEnd(writer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData); TSDB_CHECK_CODE(code, lino, _exit); } if (writer->ctx->tbHasOldData) { TSDBKEY key = { .ts = bData->aTSKEY[0], .version = bData->aVersion[0], }; code = tsdbDataFileDoWriteTableOldData(writer, &key); TSDB_CHECK_CODE(code, lino, _exit); } if (!writer->ctx->tbHasOldData // && writer->blockData->nRow == 0 // ) { code = tsdbDataFileDoWriteBlockData(writer, bData); TSDB_CHECK_CODE(code, lino, _exit); } else { for (int32_t i = 0; i < bData->nRow; ++i) { TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)}; code = tsdbDataFileDoWriteTSData(writer, row); TSDB_CHECK_CODE(code, lino, _exit); } } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileFlush(SDataFileWriter *writer) { ASSERT(writer->ctx->opened); if (writer->blockData->nRow == 0) return 0; if (writer->ctx->tbHasOldData) return 0; return tsdbDataFileDoWriteBlockData(writer, writer->blockData); } static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; char fname[TSDB_FILENAME_LEN]; int32_t ftype = TSDB_FTYPE_TOMB; ASSERT(writer->files[ftype].size == 0); int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname); code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; code = tsdbWriteFile(writer->fd[ftype], 0, hdr, TSDB_FHDR_SIZE); TSDB_CHECK_CODE(code, lino, _exit); writer->files[ftype].size += TSDB_FHDR_SIZE; if (writer->ctx->reader) { code = tsdbDataFileReadTombBlk(writer->ctx->reader, &writer->ctx->tombBlkArray); TSDB_CHECK_CODE(code, lino, _exit); if (TARRAY2_SIZE(writer->ctx->tombBlkArray) > 0) { writer->ctx->hasOldTomb = true; } writer->ctx->tombBlkArrayIdx = 0; tTombBlockClear(writer->ctx->tombBlock); writer->ctx->tombBlockIdx = 0; } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record) { int32_t code = 0; int32_t lino = 0; if (!writer->ctx->opened) { code = tsdbDataFileWriterDoOpen(writer); TSDB_CHECK_CODE(code, lino, _exit); } if (writer->fd[TSDB_FTYPE_TOMB] == NULL) { code = tsdbDataFileWriterOpenTombFD(writer); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbDataFileDoWriteTombRecord(writer, record); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; }