提交 96d29bc9 编写于 作者: H Hongze Cheng

more code

上级 d3bf34e9
...@@ -74,7 +74,7 @@ struct SSttFileWriterConfig { ...@@ -74,7 +74,7 @@ struct SSttFileWriterConfig {
STFile file; STFile file;
SSkmInfo *skmTb; SSkmInfo *skmTb;
SSkmInfo *skmRow; SSkmInfo *skmRow;
uint8_t **aBuf; uint8_t **bufArr;
}; };
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -345,14 +345,18 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { ...@@ -345,14 +345,18 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSttBlk sttBlk[1];
SSttBlk sttBlk[1] = {{
sttBlk->suid = writer->bData->suid; .suid = writer->bData->suid,
sttBlk->minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0]; .minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0],
sttBlk->maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1]; .maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1],
sttBlk->minKey = sttBlk->maxKey = writer->bData->aTSKEY[0]; .minKey = writer->bData->aTSKEY[0],
sttBlk->minVer = sttBlk->maxVer = writer->bData->aVersion[0]; .maxKey = writer->bData->aTSKEY[0],
sttBlk->nRow = writer->bData->nRow; .minVer = writer->bData->aVersion[0],
.maxVer = writer->bData->aVersion[0],
.nRow = writer->bData->nRow,
}};
for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) { for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) {
if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow]; if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow];
if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow]; if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow];
...@@ -360,7 +364,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { ...@@ -360,7 +364,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
} }
code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->sizeArr); code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, writer->sizeArr);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
sttBlk->bInfo.offset = writer->file->size; sttBlk->bInfo.offset = writer->file->size;
...@@ -369,7 +373,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { ...@@ -369,7 +373,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
for (int32_t i = 3; i >= 0; i--) { for (int32_t i = 3; i >= 0; i--) {
if (writer->sizeArr[i]) { if (writer->sizeArr[i]) {
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->sizeArr[i]); code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], writer->sizeArr[i]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->sizeArr[i]; writer->file->size += writer->sizeArr[i];
} }
...@@ -419,12 +423,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { ...@@ -419,12 +423,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) { for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
int32_t size; int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->dataArr[i]), code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i),
TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
&writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
statisBlk->size[i] = size; statisBlk->size[i] = size;
...@@ -480,14 +484,14 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { ...@@ -480,14 +484,14 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t size; int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]), code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]),
TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
&writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tombBlk->size[i] = size; tombBlk->size[i] = size;
tombBlk->dp[0].size += size; tombBlk->dp->size += size;
writer->file->size += size; writer->file->size += size;
} }
...@@ -569,21 +573,21 @@ _exit: ...@@ -569,21 +573,21 @@ _exit:
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
writer->footer->prevFooter = writer->config->file.size; writer->footer->prevFooter = writer->config->file.size;
int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
if (code) return code;
writer->file->size += sizeof(writer->footer); writer->file->size += sizeof(writer->footer);
return code; return 0;
} }
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
// set // set
writer->file[0] = writer->config->file; writer->file[0] = writer->config->file;
writer->file->stt->nseg++; writer->file->stt->nseg++;
if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
if (!writer->config->aBuf) writer->config->aBuf = writer->bufArr; if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
// open file // open file
int32_t flag; int32_t flag;
...@@ -606,11 +610,11 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { ...@@ -606,11 +610,11 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
writer->file->size += sizeof(hdr); writer->file->size += sizeof(hdr);
} }
writer->ctx->opened = true;
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(vid, lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
} else {
writer->ctx->opened = true;
} }
return code; return code;
} }
...@@ -623,8 +627,8 @@ static void tsdbSttFWriterDoClose(SSttFileWriter *writer) { ...@@ -623,8 +627,8 @@ static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
} }
tDestroyTSchema(writer->skmRow->pTSchema); tDestroyTSchema(writer->skmRow->pTSchema);
tDestroyTSchema(writer->skmTb->pTSchema); tDestroyTSchema(writer->skmTb->pTSchema);
tStatisBlockFree(writer->sData);
tTombBlockFree(writer->tData); tTombBlockFree(writer->tData);
tStatisBlockFree(writer->sData);
tBlockDataDestroy(writer->bData); tBlockDataDestroy(writer->bData);
TARRAY2_FREE(writer->tombBlkArray); TARRAY2_FREE(writer->tombBlkArray);
TARRAY2_FREE(writer->statisBlkArray); TARRAY2_FREE(writer->statisBlkArray);
...@@ -783,6 +787,18 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { ...@@ -783,6 +787,18 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
}}; }};
code = tStatisBlockPut(writer->sData, record); code = tStatisBlockPut(writer->sData, record);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else {
TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->count)++;
TARRAY2_LAST(writer->sData->lastKey) = key->ts;
TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
} else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
} else {
ASSERTS(0, "timestamp should be in ascending order");
}
} }
if (row->row.type == TSDBROW_ROW_FMT) { if (row->row.type == TSDBROW_ROW_FMT) {
...@@ -799,6 +815,10 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { ...@@ -799,6 +815,10 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
: writer->bData->aUid[writer->bData->nRow - 1]) == row->uid // : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid //
&& writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts //
) { ) {
if (writer->bData->nRow == 1) {
TARRAY2_LAST(writer->sData->firstKeyVer) = key->version;
}
code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
...@@ -811,18 +831,6 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { ...@@ -811,18 +831,6 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->lastKey) = key->ts;
TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
TARRAY2_LAST(writer->sData->count)++;
} else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->lastKeyVer) = key->version;
} else {
ASSERTS(0, "timestamp should be in ascending order");
}
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(vid, lino, code); TSDB_ERROR_LOG(vid, lino, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册