diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 7f14740f040edcb6795fcb9823654881042b165c..83468bf621a8e318e21060d0cbf854a12d1f0fca 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -74,7 +74,7 @@ struct SSttFileWriterConfig { STFile file; SSkmInfo *skmTb; SSkmInfo *skmRow; - uint8_t **aBuf; + uint8_t **bufArr; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 05e2037e4bf888375238856edf8488724ad90269..62dc9018235bda72e9fff02c97578e076c0e92ae 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -345,14 +345,18 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - SSttBlk sttBlk[1]; - - sttBlk->suid = writer->bData->suid; - sttBlk->minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0]; - sttBlk->maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1]; - sttBlk->minKey = sttBlk->maxKey = writer->bData->aTSKEY[0]; - sttBlk->minVer = sttBlk->maxVer = writer->bData->aVersion[0]; - sttBlk->nRow = writer->bData->nRow; + + SSttBlk sttBlk[1] = {{ + .suid = writer->bData->suid, + .minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0], + .maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1], + .minKey = writer->bData->aTSKEY[0], + .maxKey = writer->bData->aTSKEY[0], + .minVer = writer->bData->aVersion[0], + .maxVer = writer->bData->aVersion[0], + .nRow = writer->bData->nRow, + }}; + for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) { if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow]; if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow]; @@ -360,7 +364,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; } - code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->sizeArr); + code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, writer->sizeArr); TSDB_CHECK_CODE(code, lino, _exit); sttBlk->bInfo.offset = writer->file->size; @@ -369,7 +373,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { for (int32_t i = 3; i >= 0; i--) { if (writer->sizeArr[i]) { - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->sizeArr[i]); + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], writer->sizeArr[i]); TSDB_CHECK_CODE(code, lino, _exit); writer->file->size += writer->sizeArr[i]; } @@ -419,12 +423,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) { int32_t size; - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->dataArr[i]), + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i), TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, - &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); + &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size); TSDB_CHECK_CODE(code, lino, _exit); statisBlk->size[i] = size; @@ -480,14 +484,14 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t size; code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]), TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, - &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); + &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size); TSDB_CHECK_CODE(code, lino, _exit); tombBlk->size[i] = size; - tombBlk->dp[0].size += size; + tombBlk->dp->size += size; writer->file->size += size; } @@ -569,21 +573,21 @@ _exit: static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { writer->footer->prevFooter = writer->config->file.size; int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); + if (code) return code; writer->file->size += sizeof(writer->footer); - return code; + return 0; } static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); // set writer->file[0] = writer->config->file; writer->file->stt->nseg++; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; - if (!writer->config->aBuf) writer->config->aBuf = writer->bufArr; + if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr; // open file int32_t flag; @@ -606,11 +610,11 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { writer->file->size += sizeof(hdr); } + writer->ctx->opened = true; + _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } else { - writer->ctx->opened = true; + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -623,8 +627,8 @@ static void tsdbSttFWriterDoClose(SSttFileWriter *writer) { } tDestroyTSchema(writer->skmRow->pTSchema); tDestroyTSchema(writer->skmTb->pTSchema); - tStatisBlockFree(writer->sData); tTombBlockFree(writer->tData); + tStatisBlockFree(writer->sData); tBlockDataDestroy(writer->bData); TARRAY2_FREE(writer->tombBlkArray); TARRAY2_FREE(writer->statisBlkArray); @@ -783,6 +787,18 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { }}; code = tStatisBlockPut(writer->sData, record); TSDB_CHECK_CODE(code, lino, _exit); + } else { + TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version); + TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version); + if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->count)++; + TARRAY2_LAST(writer->sData->lastKey) = key->ts; + TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; + } else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; + } else { + ASSERTS(0, "timestamp should be in ascending order"); + } } if (row->row.type == TSDBROW_ROW_FMT) { @@ -799,6 +815,10 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid // && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // ) { + if (writer->bData->nRow == 1) { + TARRAY2_LAST(writer->sData->firstKeyVer) = key->version; + } + code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); TSDB_CHECK_CODE(code, lino, _exit); } else { @@ -811,18 +831,6 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } - TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version); - TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version); - if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) { - TARRAY2_LAST(writer->sData->lastKey) = key->ts; - TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; - TARRAY2_LAST(writer->sData->count)++; - } else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) { - TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; - } else { - ASSERTS(0, "timestamp should be in ascending order"); - } - _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code);