提交 5b1492cb 编写于 作者: H Hongze Cheng

refact code

上级 c161943f
...@@ -25,6 +25,11 @@ typedef struct { ...@@ -25,6 +25,11 @@ typedef struct {
SFDataPtr rsrvd[2]; SFDataPtr rsrvd[2];
} STombFooter; } STombFooter;
extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr,
int64_t *fileSize);
// SDataFileReader ============================================= // SDataFileReader =============================================
struct SDataFileReader { struct SDataFileReader {
SDataFileReaderConfig config[1]; SDataFileReaderConfig config[1];
...@@ -644,81 +649,89 @@ _exit: ...@@ -644,81 +649,89 @@ _exit:
return code; return code;
} }
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; TBrinBlkArray *brinBlkArray, uint8_t **bufArr) {
if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0;
int32_t code = 0; int32_t code;
int32_t lino = 0;
// get SBrinBlk // get SBrinBlk
SBrinBlk brinBlk[1] = { SBrinBlk brinBlk[1] = {
{ {
.dp[0] = .dp[0] =
{ {
.offset = writer->files[TSDB_FTYPE_HEAD].size, .offset = *fileSize,
.size = 0, .size = 0,
}, },
.minTbid = .minTbid =
{ {
.suid = TARRAY2_FIRST(writer->brinBlock->suid), .suid = TARRAY2_FIRST(brinBlock->suid),
.uid = TARRAY2_FIRST(writer->brinBlock->uid), .uid = TARRAY2_FIRST(brinBlock->uid),
}, },
.maxTbid = .maxTbid =
{ {
.suid = TARRAY2_LAST(writer->brinBlock->suid), .suid = TARRAY2_LAST(brinBlock->suid),
.uid = TARRAY2_LAST(writer->brinBlock->uid), .uid = TARRAY2_LAST(brinBlock->uid),
}, },
.minVer = TARRAY2_FIRST(writer->brinBlock->minVer), .minVer = TARRAY2_FIRST(brinBlock->minVer),
.maxVer = TARRAY2_FIRST(writer->brinBlock->minVer), .maxVer = TARRAY2_FIRST(brinBlock->minVer),
.numRec = BRIN_BLOCK_SIZE(writer->brinBlock), .numRec = BRIN_BLOCK_SIZE(brinBlock),
.cmprAlg = writer->config->cmprAlg, .cmprAlg = cmprAlg,
}, },
}; };
for (int32_t i = 1; i < BRIN_BLOCK_SIZE(writer->brinBlock); i++) { for (int32_t i = 1; i < BRIN_BLOCK_SIZE(brinBlock); i++) {
if (brinBlk->minVer > TARRAY2_GET(writer->brinBlock->minVer, i)) { if (brinBlk->minVer > TARRAY2_GET(brinBlock->minVer, i)) {
brinBlk->minVer = TARRAY2_GET(writer->brinBlock->minVer, i); brinBlk->minVer = TARRAY2_GET(brinBlock->minVer, i);
} }
if (brinBlk->maxVer < TARRAY2_GET(writer->brinBlock->maxVer, i)) { if (brinBlk->maxVer < TARRAY2_GET(brinBlock->maxVer, i)) {
brinBlk->maxVer = TARRAY2_GET(writer->brinBlock->maxVer, i); brinBlk->maxVer = TARRAY2_GET(brinBlock->maxVer, i);
} }
} }
// write to file // write to file
for (int32_t i = 0; i < ARRAY_SIZE(writer->brinBlock->dataArr1); i++) { for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr1 + i), code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i),
TARRAY2_DATA_LEN(writer->brinBlock->dataArr1 + i), TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[i], &bufArr[1]);
&writer->config->bufArr[0], 0, &brinBlk->size[i], &writer->config->bufArr[1]); if (code) return code;
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[i]);
brinBlk->size[i]); if (code) return code;
TSDB_CHECK_CODE(code, lino, _exit);
brinBlk->dp->size += brinBlk->size[i]; brinBlk->dp->size += brinBlk->size[i];
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i]; *fileSize += brinBlk->size[i];
} }
for (int32_t i = 0, j = ARRAY_SIZE(writer->brinBlock->dataArr1); i < ARRAY_SIZE(writer->brinBlock->dataArr2); for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) {
i++, j++) { code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(brinBlock->dataArr2 + i),
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[j], &bufArr[1]);
TARRAY2_DATA_LEN(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, if (code) return code;
&writer->config->bufArr[0], 0, &brinBlk->size[j], &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[j]);
brinBlk->size[j]); if (code) return code;
TSDB_CHECK_CODE(code, lino, _exit);
brinBlk->dp->size += brinBlk->size[j]; brinBlk->dp->size += brinBlk->size[j];
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j]; *fileSize += brinBlk->size[j];
} }
// append to brinBlkArray // append to brinBlkArray
code = TARRAY2_APPEND_PTR(writer->brinBlkArray, brinBlk); code = TARRAY2_APPEND_PTR(brinBlkArray, brinBlk);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
tBrinBlockClear(writer->brinBlock); tBrinBlockClear(brinBlock);
return 0;
}
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -1154,53 +1167,10 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { ...@@ -1154,53 +1167,10 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STombBlk tombBlk[1] = {{ code = tsdbFileDoWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
.numRec = TOMB_BLOCK_SIZE(writer->tombBlock), &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr);
.minTbid =
{
.suid = TARRAY2_FIRST(writer->tombBlock->suid),
.uid = TARRAY2_FIRST(writer->tombBlock->uid),
},
.maxTbid =
{
.suid = TARRAY2_LAST(writer->tombBlock->suid),
.uid = TARRAY2_LAST(writer->tombBlock->uid),
},
.minVer = TARRAY2_FIRST(writer->tombBlock->version),
.maxVer = TARRAY2_FIRST(writer->tombBlock->version),
.dp[0] =
{
.offset = writer->files[TSDB_FTYPE_TOMB].size,
.size = 0,
},
}};
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) {
tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tombBlock->version, i));
tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tombBlock->version, i));
}
for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) {
int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]),
TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
&writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, writer->config->bufArr[0],
size);
TSDB_CHECK_CODE(code, lino, _exit);
tombBlk->size[i] = size;
tombBlk->dp[0].size += size;
writer->files[TSDB_FTYPE_TOMB].size += size;
}
code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tTombBlockClear(writer->tombBlock);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
...@@ -1214,14 +1184,9 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) { ...@@ -1214,14 +1184,9 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t ftype = TSDB_FTYPE_TOMB; code = tsdbFileDoWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
writer->tombFooter->tombBlkPtr->offset = writer->files[ftype].size; &writer->files[TSDB_FTYPE_TOMB].size);
writer->tombFooter->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
code = tsdbWriteFile(writer->fd[ftype], writer->tombFooter->tombBlkPtr->offset,
(const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->tombFooter->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->files[ftype].size += writer->tombFooter->tombBlkPtr->size;
_exit: _exit:
if (code) { if (code) {
...@@ -1306,20 +1271,25 @@ _exit: ...@@ -1306,20 +1271,25 @@ _exit:
return code; return code;
} }
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) { int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
ASSERT(TARRAY2_SIZE(writer->brinBlkArray) > 0); ASSERT(TARRAY2_SIZE(brinBlkArray) > 0);
ptr->offset = *fileSize;
ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
int32_t code = tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size);
if (code) return code;
*fileSize += ptr->size;
return 0;
}
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t ftype = TSDB_FTYPE_HEAD; code = tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
writer->headFooter->brinBlkPtr->offset = writer->files[ftype].size; &writer->files[TSDB_FTYPE_HEAD].size);
writer->headFooter->brinBlkPtr->size = TARRAY2_DATA_LEN(writer->brinBlkArray);
code = tsdbWriteFile(writer->fd[ftype], writer->headFooter->brinBlkPtr->offset,
(uint8_t *)TARRAY2_DATA(writer->brinBlkArray), writer->headFooter->brinBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->files[ftype].size += writer->headFooter->brinBlkPtr->size;
_exit: _exit:
if (code) { if (code) {
......
...@@ -402,50 +402,63 @@ struct SSttFileWriter { ...@@ -402,50 +402,63 @@ struct SSttFileWriter {
uint8_t *bufArr[5]; uint8_t *bufArr[5];
}; };
static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) { int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
if (writer->blockData->nRow == 0) return 0; TSttBlkArray *sttBlkArray, uint8_t **bufArr) {
if (blockData->nRow == 0) return 0;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SSttBlk sttBlk[1] = {{ SSttBlk sttBlk[1] = {{
.suid = writer->blockData->suid, .suid = blockData->suid,
.minUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[0], .minUid = blockData->uid ? blockData->uid : blockData->aUid[0],
.maxUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[writer->blockData->nRow - 1], .maxUid = blockData->uid ? blockData->uid : blockData->aUid[blockData->nRow - 1],
.minKey = writer->blockData->aTSKEY[0], .minKey = blockData->aTSKEY[0],
.maxKey = writer->blockData->aTSKEY[0], .maxKey = blockData->aTSKEY[0],
.minVer = writer->blockData->aVersion[0], .minVer = blockData->aVersion[0],
.maxVer = writer->blockData->aVersion[0], .maxVer = blockData->aVersion[0],
.nRow = writer->blockData->nRow, .nRow = blockData->nRow,
}}; }};
for (int32_t iRow = 1; iRow < writer->blockData->nRow; iRow++) { for (int32_t iRow = 1; iRow < blockData->nRow; iRow++) {
if (sttBlk->minKey > writer->blockData->aTSKEY[iRow]) sttBlk->minKey = writer->blockData->aTSKEY[iRow]; if (sttBlk->minKey > blockData->aTSKEY[iRow]) sttBlk->minKey = blockData->aTSKEY[iRow];
if (sttBlk->maxKey < writer->blockData->aTSKEY[iRow]) sttBlk->maxKey = writer->blockData->aTSKEY[iRow]; if (sttBlk->maxKey < blockData->aTSKEY[iRow]) sttBlk->maxKey = blockData->aTSKEY[iRow];
if (sttBlk->minVer > writer->blockData->aVersion[iRow]) sttBlk->minVer = writer->blockData->aVersion[iRow]; if (sttBlk->minVer > blockData->aVersion[iRow]) sttBlk->minVer = blockData->aVersion[iRow];
if (sttBlk->maxVer < writer->blockData->aVersion[iRow]) sttBlk->maxVer = writer->blockData->aVersion[iRow]; if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow];
} }
int32_t sizeArr[5] = {0}; int32_t sizeArr[5] = {0};
code = tCmprBlockData(writer->blockData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
sttBlk->bInfo.offset = writer->file->size; sttBlk->bInfo.offset = *fileSize;
sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3]; sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey; sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
for (int32_t i = 3; i >= 0; i--) { for (int32_t i = 3; i >= 0; i--) {
if (sizeArr[i]) { if (sizeArr[i]) {
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]); code = tsdbWriteFile(fd, *fileSize, bufArr[i], sizeArr[i]);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
writer->file->size += sizeArr[i]; *fileSize += sizeArr[i];
} }
} }
code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk); code = TARRAY2_APPEND_PTR(sttBlkArray, sttBlk);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
tBlockDataClear(blockData);
tBlockDataClear(writer->blockData); return 0;
}
static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
if (writer->blockData->nRow == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
writer->sttBlkArray, writer->config->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -516,60 +529,71 @@ _exit: ...@@ -516,60 +529,71 @@ _exit:
return code; return code;
} }
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0; TTombBlkArray *tombBlkArray, uint8_t **bufArr) {
int32_t code;
int32_t code = 0; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
int32_t lino = 0;
STombBlk tombBlk[1] = {{ STombBlk tombBlk[1] = {{
.dp[0] = .dp[0] =
{ {
.offset = writer->file->size, .offset = *fileSize,
.size = 0, .size = 0,
}, },
.minTbid = .minTbid =
{ {
.suid = TARRAY2_FIRST(writer->tombBlock->suid), .suid = TARRAY2_FIRST(tombBlock->suid),
.uid = TARRAY2_FIRST(writer->tombBlock->uid), .uid = TARRAY2_FIRST(tombBlock->uid),
}, },
.maxTbid = .maxTbid =
{ {
.suid = TARRAY2_LAST(writer->tombBlock->suid), .suid = TARRAY2_LAST(tombBlock->suid),
.uid = TARRAY2_LAST(writer->tombBlock->uid), .uid = TARRAY2_LAST(tombBlock->uid),
}, },
.minVer = TARRAY2_FIRST(writer->tombBlock->version), .minVer = TARRAY2_FIRST(tombBlock->version),
.maxVer = TARRAY2_FIRST(writer->tombBlock->version), .maxVer = TARRAY2_FIRST(tombBlock->version),
.numRec = TOMB_BLOCK_SIZE(writer->tombBlock), .numRec = TOMB_BLOCK_SIZE(tombBlock),
.cmprAlg = writer->config->cmprAlg, .cmprAlg = cmprAlg,
}}; }};
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) { for (int32_t i = 1; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
if (tombBlk->minVer > TARRAY2_GET(writer->tombBlock->version, i)) { if (tombBlk->minVer > TARRAY2_GET(tombBlock->version, i)) {
tombBlk->minVer = TARRAY2_GET(writer->tombBlock->version, i); tombBlk->minVer = TARRAY2_GET(tombBlock->version, i);
} }
if (tombBlk->maxVer < TARRAY2_GET(writer->tombBlock->version, i)) { if (tombBlk->maxVer < TARRAY2_GET(tombBlock->version, i)) {
tombBlk->maxVer = TARRAY2_GET(writer->tombBlock->version, i); tombBlk->maxVer = TARRAY2_GET(tombBlock->version, i);
} }
} }
for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) { for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]), code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]),
TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]);
&writer->config->bufArr[0], 0, &tombBlk->size[i], &writer->config->bufArr[1]); if (code) return code;
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], tombBlk->size[i]); code = tsdbWriteFile(fd, *fileSize, bufArr[0], tombBlk->size[i]);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
tombBlk->dp->size += tombBlk->size[i]; tombBlk->dp->size += tombBlk->size[i];
writer->file->size += tombBlk->size[i]; *fileSize += tombBlk->size[i];
} }
code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk); code = TARRAY2_APPEND_PTR(tombBlkArray, tombBlk);
TSDB_CHECK_CODE(code, lino, _exit); if (code) return code;
tTombBlockClear(writer->tombBlock); tTombBlockClear(tombBlock);
return 0;
}
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
code = tsdbFileDoWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -578,18 +602,27 @@ _exit: ...@@ -578,18 +602,27 @@ _exit:
return code; return code;
} }
int32_t tsdbFileDoWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
ptr->size = TARRAY2_DATA_LEN(sttBlkArray);
if (ptr->size > 0) {
ptr->offset = *fileSize;
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(sttBlkArray), ptr->size);
if (code) {
return code;
}
*fileSize += ptr->size;
}
return 0;
}
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray); code = tsdbFileDoWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size);
if (writer->footer->sttBlkPtr->size) { TSDB_CHECK_CODE(code, lino, _exit);
writer->footer->sttBlkPtr->offset = writer->file->size;
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
writer->footer->sttBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->footer->sttBlkPtr->size;
}
_exit: _exit:
if (code) { if (code) {
...@@ -618,18 +651,27 @@ _exit: ...@@ -618,18 +651,27 @@ _exit:
return code; return code;
} }
int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
if (ptr->size > 0) {
ptr->offset = *fileSize;
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size);
if (code) {
return code;
}
*fileSize += ptr->size;
}
return 0;
}
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray); code = tsdbFileDoWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size);
if (writer->footer->tombBlkPtr->size) { TSDB_CHECK_CODE(code, lino, _exit);
writer->footer->tombBlkPtr->offset = writer->file->size;
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
writer->footer->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->footer->tombBlkPtr->size;
}
_exit: _exit:
if (code) { if (code) {
...@@ -638,13 +680,17 @@ _exit: ...@@ -638,13 +680,17 @@ _exit:
return code; return code;
} }
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) {
int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
if (code) return code; if (code) return code;
writer->file->size += sizeof(writer->footer); *fileSize += sizeof(*footer);
return 0; return 0;
} }
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
return tsdbSttFileDoWriteFooterImpl(writer->fd, writer->footer, &writer->file->size);
}
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
......
...@@ -202,11 +202,15 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * ...@@ -202,11 +202,15 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
} }
if (fd) { if (fd) {
// write footer
// sync and close
code = tsdbFsyncFile(fd); code = tsdbFsyncFile(fd);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&fd); tsdbCloseFile(&fd);
} }
// clear
TARRAY2_DESTROY(tombBlkArray, NULL); TARRAY2_DESTROY(tombBlkArray, NULL);
tTombBlockDestroy(tombBlock); tTombBlockDestroy(tombBlock);
taosArrayDestroy(aDelData); taosArrayDestroy(aDelData);
...@@ -260,10 +264,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { ...@@ -260,10 +264,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
TFileSetArray fileSetArray[1] = {0}; TFileSetArray fileSetArray[1] = {0};
// load old file system and convert // open old file system
code = tsdbFSOpen(tsdb, rollback); code = tsdbFSOpen(tsdb, rollback);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// upgrade each file set
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i); SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i);
...@@ -271,21 +276,23 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { ...@@ -271,21 +276,23 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// upgrade tomb file
if (tsdb->fs.pDelFile != NULL) { if (tsdb->fs.pDelFile != NULL) {
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// close file system
code = tsdbFSClose(tsdb); code = tsdbFSClose(tsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// save new file system // save new file system
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
current_fname(tsdb, fname, TSDB_FCURRENT); current_fname(tsdb, fname, TSDB_FCURRENT);
code = save_fs(fileSetArray, fname); code = save_fs(fileSetArray, fname);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// clear
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册