diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index cf7d0765eb1f318eab08da6a454238e8256434da..4a5c148751160e1cd198ac54957f97d781393cdb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -51,8 +51,8 @@ int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *stati struct SSttFileReaderConfig { STsdb *tsdb; - SSkmInfo *pSkmTb; - SSkmInfo *pSkmRow; + SSkmInfo *skmTb; + SSkmInfo *skmRow; uint8_t **aBuf; int32_t szPage; STFile file; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index 0055751a86598183dd179d2801ff6e95ee180022..7c76abde0a946022066de69f14cbe6dac96cf9ea 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -17,13 +17,15 @@ typedef struct { int64_t prevFooter; - SFDataPtr dict[4]; // 0:bloom filter, 1:SSttBlk, 2:STbStatisBlk, 3:SDelBlk - uint8_t reserved[24]; -} SFSttFooter; + SFDataPtr sttBlkPtr[1]; + SFDataPtr delBlkPtr[1]; + SFDataPtr statisBlkPtr[1]; + SFDataPtr rsrvd[2]; +} SSttFooter; // SSttFReader ============================================================ struct SSttFileReader { - SSttFileReaderConfig config; + SSttFileReaderConfig config[1]; TSttSegReaderArray segReaderArray; STsdbFD *fd; }; @@ -38,7 +40,7 @@ struct SSttSegReader { bool statisBlkLoaded; } ctx; - SFSttFooter footer; + SSttFooter footer[1]; void *bloomFilter; TSttBlkArray sttBlkArray; TDelBlkArray delBlkArray; @@ -49,7 +51,7 @@ struct SSttSegReader { static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SSttSegReader **segReader) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(reader->config.tsdb->pVnode); + int32_t vid = TD_VID(reader->config->tsdb->pVnode); ASSERT(offset >= TSDB_FHDR_SIZE); @@ -97,7 +99,7 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader ** reader[0] = taosMemoryCalloc(1, sizeof(*reader[0])); if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; - reader[0]->config = config[0]; + reader[0]->config[0] = config[0]; TARRAY2_INIT(&reader[0]->segReaderArray); // open file @@ -111,13 +113,13 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader ** while (size > 0) { SSttSegReader *segReader; - code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SFSttFooter), &segReader); + code = tsdbSttSegReaderOpen(reader[0], size - sizeof(SSttFooter), &segReader); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(&reader[0]->segReaderArray, segReader); TSDB_CHECK_CODE(code, lino, _exit); - size = segReader->footer.prevFooter; + size = segReader->footer->prevFooter; } ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg); @@ -152,15 +154,15 @@ int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter) { int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) { if (!reader->ctx.statisBlkLoaded) { - SFDataPtr fptr = reader->footer.dict[2]; - if (fptr.size > 0) { - ASSERT(fptr.size % sizeof(STbStatisBlk) == 0); + if (reader->footer->statisBlkPtr->size > 0) { + ASSERT(reader->footer->statisBlkPtr->size % sizeof(STbStatisBlk) == 0); - int32_t size = fptr.size / sizeof(STbStatisBlk); - void *data = taosMemoryMalloc(fptr.size); + int32_t size = reader->footer->statisBlkPtr->size / sizeof(STbStatisBlk); + void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; - int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data, + reader->footer->statisBlkPtr->size); if (code) return code; TARRAY2_INIT_EX(&reader->statisBlkArray, size, size, data); @@ -177,15 +179,15 @@ int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **sta int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) { if (!reader->ctx.delBlkLoaded) { - SFDataPtr fptr = reader->footer.dict[3]; - if (fptr.size > 0) { - ASSERT(fptr.size % sizeof(SDelBlk) == 0); + if (reader->footer->delBlkPtr->size > 0) { + ASSERT(reader->footer->delBlkPtr->size % sizeof(SDelBlk) == 0); - int32_t size = fptr.size / sizeof(SDelBlk); - void *data = taosMemoryMalloc(fptr.size); + int32_t size = reader->footer->delBlkPtr->size / sizeof(SDelBlk); + void *data = taosMemoryMalloc(reader->footer->delBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; - int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + int32_t code = + tsdbReadFile(reader->reader->fd, reader->footer->delBlkPtr->offset, data, reader->footer->delBlkPtr->size); if (code) return code; TARRAY2_INIT_EX(&reader->delBlkArray, size, size, data); @@ -202,15 +204,15 @@ int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArr int32_t tsdbSttFReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) { if (!reader->ctx.sttBlkLoaded) { - SFDataPtr fptr = reader->footer.dict[1]; - if (fptr.size > 0) { - ASSERT(fptr.size % sizeof(SSttBlk) == 0); + if (reader->footer->sttBlkPtr->size > 0) { + ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); - int32_t size = fptr.size / sizeof(SSttBlk); - void *data = taosMemoryMalloc(fptr.size); + int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk); + void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; - int32_t code = tsdbReadFile(reader->reader->fd, fptr.offset, data, fptr.size); + int32_t code = + tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size); if (code) return code; TARRAY2_INIT_EX(&reader->sttBlkArray, size, size, data); @@ -245,18 +247,17 @@ int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *stati // SSttFWriter ============================================================ struct SSttFileWriter { - SSttFileWriterConfig config; + SSttFileWriterConfig config[1]; struct { bool opened; } ctx; // file STFile file; // data - TSttBlkArray sttBlkArray; - TDelBlkArray delBlkArray; - TStatisBlkArray statisBlkArray; - void *bloomFilter; // TODO - SFSttFooter footer; + TSttBlkArray sttBlkArray[1]; + TDelBlkArray delBlkArray[1]; + TStatisBlkArray statisBlkArray[1]; + SSttFooter footer[1]; SBlockData bData[1]; SDelBlock dData[1]; STbStatisBlock sData[1]; @@ -288,7 +289,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; } - code = tCmprBlockData(writer->bData, writer->config.cmprAlg, NULL, NULL, writer->config.aBuf, writer->aBufSize); + code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->aBufSize); TSDB_CHECK_CODE(code, lino, _exit); sttBlk->bInfo.offset = writer->file.size; @@ -297,19 +298,19 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { for (int32_t i = 3; i >= 0; i--) { if (writer->aBufSize[i]) { - code = tsdbWriteFile(writer->fd, writer->file.size, writer->config.aBuf[i], writer->aBufSize[i]); + code = tsdbWriteFile(writer->fd, writer->file.size, writer->config->aBuf[i], writer->aBufSize[i]); TSDB_CHECK_CODE(code, lino, _exit); writer->file.size += writer->aBufSize[i]; } } tBlockDataClear(writer->bData); - code = TARRAY2_APPEND_P(&writer->sttBlkArray, sttBlk); + code = TARRAY2_APPEND_P(writer->sttBlkArray, sttBlk); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -348,12 +349,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { } tTbStatisBlockClear(writer->sData); - code = TARRAY2_APPEND_P(&writer->statisBlkArray, statisBlk); + code = TARRAY2_APPEND_P(writer->statisBlkArray, statisBlk); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -391,12 +392,12 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { } tDelBlockDestroy(writer->dData); - code = TARRAY2_APPEND_P(&writer->delBlkArray, delBlk); + code = TARRAY2_APPEND_P(writer->delBlkArray, delBlk); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { // tsdbTrace(); @@ -408,20 +409,20 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer.dict[1].offset = writer->file.size; - writer->footer.dict[1].size = sizeof(SSttBlk) * TARRAY2_SIZE(&writer->sttBlkArray); + writer->footer->sttBlkPtr->offset = writer->file.size; + writer->footer->sttBlkPtr->size = sizeof(SSttBlk) * TARRAY2_SIZE(writer->sttBlkArray); - if (writer->footer.dict[1].size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->sttBlkArray), - writer->footer.dict[1].size); + if (writer->footer->sttBlkPtr->size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray), + writer->footer->sttBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer.dict[1].size; + writer->file.size += writer->footer->sttBlkPtr->size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -431,19 +432,19 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer.dict[2].offset = writer->file.size; - writer->footer.dict[2].size = sizeof(STbStatisBlock) * TARRAY2_SIZE(&writer->statisBlkArray); + writer->footer->statisBlkPtr->offset = writer->file.size; + writer->footer->statisBlkPtr->size = sizeof(STbStatisBlock) * TARRAY2_SIZE(writer->statisBlkArray); - if (writer->footer.dict[2].size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->statisBlkArray), - writer->footer.dict[2].size); + if (writer->footer->statisBlkPtr->size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray), + writer->footer->statisBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer.dict[2].size; + writer->file.size += writer->footer->statisBlkPtr->size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -453,19 +454,19 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer.dict[3].offset = writer->file.size; - writer->footer.dict[3].size = sizeof(SDelBlk) * TARRAY2_SIZE(&writer->delBlkArray); + writer->footer->delBlkPtr->offset = writer->file.size; + writer->footer->delBlkPtr->size = sizeof(SDelBlk) * TARRAY2_SIZE(writer->delBlkArray); - if (writer->footer.dict[3].size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->delBlkArray), - writer->footer.dict[3].size); + if (writer->footer->delBlkPtr->size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray), + writer->footer->delBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer.dict[3].size; + writer->file.size += writer->footer->delBlkPtr->size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -480,14 +481,14 @@ static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config.tsdb->pVnode); + int32_t vid = TD_VID(writer->config->tsdb->pVnode); // set - writer->file = writer->config.file; + writer->file = writer->config->file; writer->file.stt.nseg++; - if (!writer->config.skmTb) writer->config.skmTb = &writer->skmTb; - if (!writer->config.skmRow) writer->config.skmRow = &writer->skmRow; - if (!writer->config.aBuf) writer->config.aBuf = writer->aBuf; + if (!writer->config->skmTb) writer->config->skmTb = &writer->skmTb; + if (!writer->config->skmRow) writer->config->skmRow = &writer->skmRow; + if (!writer->config->aBuf) writer->config->aBuf = writer->aBuf; // open file int32_t flag; @@ -499,8 +500,8 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; } - tsdbTFileName(writer->config.tsdb, &writer->file, fname); - code = tsdbOpenFile(fname, writer->config.szPage, flag, &writer->fd); + tsdbTFileName(writer->config->tsdb, &writer->file, fname); + code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); if (!writer->file.size) { @@ -537,7 +538,7 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) { static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { int32_t lino; int32_t code; - int32_t vid = TD_VID(writer->config.tsdb->pVnode); + int32_t vid = TD_VID(writer->config->tsdb->pVnode); code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -571,10 +572,10 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { tsdbCloseFile(&writer->fd); - ASSERT(writer->config.file.size > writer->file.size); - op->optype = writer->config.file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE; - op->fid = writer->config.file.fid; - op->of = writer->config.file; + ASSERT(writer->config->file.size > writer->file.size); + op->optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE; + op->fid = writer->config->file.fid; + op->of = writer->config->file; op->nf = writer->file; _exit: @@ -587,11 +588,11 @@ _exit: static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { char fname[TSDB_FILENAME_LEN]; - tsdbTFileName(writer->config.tsdb, &writer->config.file, fname); - if (writer->config.file.size) { // truncate the file to the original size - ASSERT(writer->config.file.size <= writer->file.size); - if (writer->config.file.size < writer->file.size) { - taosFtruncateFile(writer->fd->pFD, writer->config.file.size); + tsdbTFileName(writer->config->tsdb, &writer->config->file, fname); + if (writer->config->file.size) { // truncate the file to the original size + ASSERT(writer->config->file.size <= writer->file.size); + if (writer->config->file.size < writer->file.size) { + taosFtruncateFile(writer->fd->pFD, writer->config->file.size); tsdbCloseFile(&writer->fd); } } else { // remove the file @@ -606,7 +607,7 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter ** writer[0] = taosMemoryMalloc(sizeof(*writer[0])); if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; - writer[0]->config = config[0]; + writer[0]->config[0] = config[0]; writer[0]->ctx.opened = false; return 0; } @@ -614,7 +615,7 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter ** int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer[0]->config.tsdb->pVnode); + int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode); if (!writer[0]->ctx.opened) { op->optype = TSDB_FOP_NONE; @@ -656,7 +657,7 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } - if (writer->sData[0].nRow >= writer->config.maxRow) { + if (writer->sData[0].nRow >= writer->config->maxRow) { code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -670,26 +671,26 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) { writer->sData[0].aData[6][writer->sData[0].nRow] = 1; // count writer->sData[0].nRow++; - code = tsdbUpdateSkmTb(writer->config.tsdb, tbid, writer->config.skmTb); + code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb); TSDB_CHECK_CODE(code, lino, _exit); TABLEID id = { .suid = tbid->suid, .uid = tbid->uid ? 0 : tbid->uid, }; - code = tBlockDataInit(&writer->bData[0], &id, writer->config.skmTb->pTSchema, NULL, 0); + code = tBlockDataInit(&writer->bData[0], &id, writer->config->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } if (row->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(writer->config.tsdb, tbid, TSDBROW_SVERSION(pRow), writer->config.skmRow); + code = tsdbUpdateSkmRow(writer->config->tsdb, tbid, TSDBROW_SVERSION(pRow), writer->config->skmRow); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(&writer->bData[0], pRow, writer->config.skmRow->pTSchema, tbid->uid); + code = tBlockDataAppendRow(&writer->bData[0], pRow, writer->config->skmRow->pTSchema, tbid->uid); TSDB_CHECK_CODE(code, lino, _exit); - if (writer->bData[0].nRow >= writer->config.maxRow) { + if (writer->bData[0].nRow >= writer->config->maxRow) { code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -707,7 +708,7 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -729,7 +730,7 @@ int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, tstrerror(code)); } return 0; @@ -751,7 +752,7 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDe writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey writer->dData[0].nRow++; - if (writer->dData[0].nRow >= writer->config.maxRow) { + if (writer->dData[0].nRow >= writer->config->maxRow) { return tsdbSttFileDoWriteDelBlock(writer); } else { return 0;