提交 9cdaf241 编写于 作者: H Hongze Cheng

more code

上级 2c5b5289
...@@ -38,8 +38,6 @@ extern "C" { ...@@ -38,8 +38,6 @@ extern "C" {
typedef void (*TArray2Cb)(void *); typedef void (*TArray2Cb)(void *);
#define TARRAY2_MIN_SIZE 16
#define TARRAY2_SIZE(a) ((a)->size) #define TARRAY2_SIZE(a) ((a)->size)
#define TARRAY2_CAPACITY(a) ((a)->capacity) #define TARRAY2_CAPACITY(a) ((a)->capacity)
#define TARRAY2_DATA(a) ((a)->data) #define TARRAY2_DATA(a) ((a)->data)
...@@ -55,7 +53,7 @@ static FORCE_INLINE int32_t tarray2_make_room( // ...@@ -55,7 +53,7 @@ static FORCE_INLINE int32_t tarray2_make_room( //
int32_t sz // size of element int32_t sz // size of element
) { ) {
TARRAY2(void) *a = arg; TARRAY2(void) *a = arg;
int32_t capacity = (a->capacity > 0) ? (a->capacity << 1) : TARRAY2_MIN_SIZE; int32_t capacity = (a->capacity > 0) ? (a->capacity << 1) : 32;
while (capacity < es) { while (capacity < es) {
capacity <<= 1; capacity <<= 1;
} }
......
...@@ -49,10 +49,11 @@ int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **b ...@@ -49,10 +49,11 @@ int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **b
int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock); int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// .data // .data
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData); int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData);
int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int32_t cidArr[], int32_t numCid); STSchema *pTSchema, int16_t cids[], int32_t ncid);
// .sma // .sma
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader); int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
TColumnDataAggArray *columnDataAggArray);
// .tomb // .tomb
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray); int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray);
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData); int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData);
......
...@@ -44,7 +44,9 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk ...@@ -44,7 +44,9 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk
int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray); int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray);
int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delBlkArray); int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delBlkArray);
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData); int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid);
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData); int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData); int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData);
...@@ -61,8 +63,8 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig; ...@@ -61,8 +63,8 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig;
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer); int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer);
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray); int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray);
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row);
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); int32_t tsdbSttFileWriteBlockData(SSttFileWriter *writer, SBlockData *pBlockData);
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record); int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record);
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer); bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer);
......
...@@ -198,7 +198,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { ...@@ -198,7 +198,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbSttFileWriteTSData(committer->sttWriter, row); code = tsdbSttFileWriteRow(committer->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger); code = tsdbIterMergerNext(committer->iterMerger);
......
...@@ -268,13 +268,109 @@ _exit: ...@@ -268,13 +268,109 @@ _exit:
return code; return code;
} }
int32_t tsdbDataFileReadBlockDataByCol(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int32_t cidArr[], int32_t numCid) { STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO code = tBlockDataInit(bData, (TABLEID *)record, pTSchema, cids, ncid);
ASSERT(0); TSDB_CHECK_CODE(code, lino, _exit);
// uid + version + tskey
code = tRealloc(&reader->config->bufArr[0], record->blockKeySize);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize);
TSDB_CHECK_CODE(code, lino, _exit);
// hdr
SDiskDataHdr hdr[1];
int32_t size = 0;
size += tGetDiskDataHdr(reader->config->bufArr[0] + size, hdr);
ASSERT(hdr->delimiter == TSDB_FILE_DLMT);
ASSERT(record->uid == hdr->uid);
bData->nRow = hdr->nRow;
// uid
ASSERT(hdr->uid);
// version
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szVer;
// ts
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szKey;
ASSERT(size == record->blockKeySize);
// other columns
if (bData->nColData > 0) {
if (hdr->szBlkCol > 0) {
code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
SBlockCol bc[1] = {{.cid = 0}};
SBlockCol *blockCol = bc;
size = 0;
for (int32_t i = 0; i < bData->nColData; i++) {
SColData *colData = tBlockDataGetColDataByIdx(bData, i);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol == NULL || blockCol->cid > colData->cid) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
ASSERT(blockCol->type == colData->type);
ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);
if (blockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tRealloc(&reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA],
record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
}
_exit: _exit:
if (code) { if (code) {
...@@ -283,12 +379,31 @@ _exit: ...@@ -283,12 +379,31 @@ _exit:
return code; return code;
} }
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader) { int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
TColumnDataAggArray *columnDataAggArray) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO TARRAY2_CLEAR(columnDataAggArray, NULL);
ASSERT(0); if (record->smaSize > 0) {
code = tRealloc(&reader->config->bufArr[0], record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit);
// decode sma data
int32_t size = 0;
while (size < record->smaSize) {
SColumnDataAgg sma[1];
size += tGetColumnDataAgg(reader->config->bufArr[0] + size, sma);
code = TARRAY2_APPEND_PTR(columnDataAggArray, sma);
TSDB_CHECK_CODE(code, lino, _exit);
}
ASSERT(size == record->smaSize);
}
_exit: _exit:
if (code) { if (code) {
...@@ -798,37 +913,28 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData ...@@ -798,37 +913,28 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
} }
// to .sma file // to .sma file
TColumnDataAggArray smaArr[1];
TARRAY2_INIT(smaArr);
for (int32_t i = 0; i < bData->nColData; ++i) { for (int32_t i = 0; i < bData->nColData; ++i) {
SColData *colData = bData->aColData + i; SColData *colData = bData->aColData + i;
if ((!colData->smaOn) || ((colData->flag & HAS_VALUE) == 0)) continue;
if ((!colData->smaOn) //
|| ((colData->flag & HAS_VALUE) == 0) //
) {
continue;
}
SColumnDataAgg sma[1] = {{.colId = colData->cid}}; SColumnDataAgg sma[1] = {{.colId = colData->cid}};
tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull);
code = TARRAY2_APPEND_PTR(smaArr, sma); int32_t size = tPutColumnDataAgg(NULL, sma);
code = tRealloc(&writer->config->bufArr[0], record->smaSize + size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
record->smaSize = TARRAY2_DATA_LEN(smaArr); tPutColumnDataAgg(writer->config->bufArr[0] + record->smaSize, sma);
record->smaSize += size;
}
if (record->smaSize > 0) { if (record->smaSize > 0) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, (const uint8_t *)TARRAY2_DATA(smaArr), code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->config->bufArr[0], record->smaSize);
record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_SMA].size += record->smaSize; writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
} }
TARRAY2_DESTROY(smaArr, NULL);
// append SBrinRecord // append SBrinRecord
code = tsdbDataFileWriteBrinRecord(writer, record); code = tsdbDataFileWriteBrinRecord(writer, record);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -100,7 +100,7 @@ static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) { ...@@ -100,7 +100,7 @@ static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) {
continue; continue;
} }
int32_t code = tsdbSttFileReadDataBlock(iter->stt->reader, sttBlk, iter->stt->bData); int32_t code = tsdbSttFileReadBlockData(iter->stt->reader, sttBlk, iter->stt->bData);
if (code) return code; if (code) return code;
iter->stt->iRow = 0; iter->stt->iRow = 0;
......
...@@ -155,7 +155,7 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { ...@@ -155,7 +155,7 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (merger->ctx->bData[cidx].nRow < merger->minRow) { if (merger->ctx->bData[cidx].nRow < merger->minRow) {
code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx); code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->bData + cidx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx); code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx);
...@@ -274,7 +274,7 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) { ...@@ -274,7 +274,7 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
// data // data
SRowInfo *row; SRowInfo *row;
while ((row = tsdbIterMergerGet(merger->dataIterMerger))) { while ((row = tsdbIterMergerGet(merger->dataIterMerger))) {
code = tsdbSttFileWriteTSData(merger->sttWriter, row); code = tsdbSttFileWriteRow(merger->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(merger->dataIterMerger); code = tsdbIterMergerNext(merger->dataIterMerger);
......
...@@ -228,7 +228,7 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk ...@@ -228,7 +228,7 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk
return 0; return 0;
} }
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) { int32_t tsdbSttFileReadBlockData(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -250,6 +250,125 @@ _exit: ...@@ -250,6 +250,125 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFileReadBlockDataByColumn(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0;
int32_t lino = 0;
TABLEID tbid = {.suid = sttBlk->suid, .uid = 0};
code = tBlockDataInit(bData, &tbid, pTSchema, cids, ncid);
TSDB_CHECK_CODE(code, lino, _exit);
// uid + version + tskey
code = tRealloc(&reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset, reader->reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
// hdr
SDiskDataHdr hdr[1];
int32_t size = 0;
size += tGetDiskDataHdr(reader->reader->config->bufArr[0] + size, hdr);
ASSERT(hdr->delimiter == TSDB_FILE_DLMT);
bData->nRow = hdr->nRow;
bData->uid = hdr->uid;
// uid
if (hdr->uid == 0) {
ASSERT(hdr->szUid);
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(hdr->szUid == 0);
}
size += hdr->szUid;
// version
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szVer;
// ts
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szKey;
ASSERT(size == sttBlk->bInfo.szKey);
// other columns
if (bData->nColData > 0) {
if (hdr->szBlkCol > 0) {
code = tRealloc(&reader->reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey,
reader->reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
SBlockCol bc[1] = {{.cid = 0}};
SBlockCol *blockCol = bc;
size = 0;
for (int32_t i = 0; i < bData->nColData; i++) {
SColData *colData = tBlockDataGetColDataByIdx(bData, i);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol == NULL || blockCol->cid > colData->cid) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
ASSERT(blockCol->type == colData->type);
ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);
if (blockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tRealloc(&reader->reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->reader->fd,
sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
reader->reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->reader->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) { int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -341,7 +460,7 @@ struct SSttFileWriter { ...@@ -341,7 +460,7 @@ struct SSttFileWriter {
STsdbFD *fd; STsdbFD *fd;
}; };
static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
if (writer->bData->nRow == 0) return 0; if (writer->bData->nRow == 0) return 0;
int32_t code = 0; int32_t code = 0;
...@@ -647,7 +766,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o ...@@ -647,7 +766,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
int32_t lino; int32_t lino;
int32_t code; int32_t code;
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteBlockData(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSttFileDoWriteStatisBlock(writer); code = tsdbSttFileDoWriteStatisBlock(writer);
...@@ -752,7 +871,7 @@ _exit: ...@@ -752,7 +871,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -763,7 +882,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { ...@@ -763,7 +882,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) { if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) {
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteBlockData(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb); code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
...@@ -823,7 +942,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { ...@@ -823,7 +942,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
if (writer->bData->nRow >= writer->config->maxRow) { if (writer->bData->nRow >= writer->config->maxRow) {
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteBlockData(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -838,7 +957,7 @@ _exit: ...@@ -838,7 +957,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { int32_t tsdbSttFileWriteBlockData(SSttFileWriter *writer, SBlockData *bdata) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -849,7 +968,7 @@ int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { ...@@ -849,7 +968,7 @@ int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
row->uid = bdata->uid ? bdata->uid : bdata->aUid[i]; row->uid = bdata->uid ? bdata->uid : bdata->aUid[i];
row->row = tsdbRowFromBlockData(bdata, i); row->row = tsdbRowFromBlockData(bdata, i);
code = tsdbSttFileWriteTSData(writer, row); code = tsdbSttFileWriteRow(writer, row);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -869,7 +988,7 @@ int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *re ...@@ -869,7 +988,7 @@ int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *re
return code; return code;
} else { } else {
if (writer->bData->nRow > 0) { if (writer->bData->nRow > 0) {
code = tsdbSttFileDoWriteTSDataBlock(writer); code = tsdbSttFileDoWriteBlockData(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
...@@ -1489,7 +1489,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { ...@@ -1489,7 +1489,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
} }
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) { int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX); return tsdbReadDelDatav1(pReader, pDelIdx, aDelData, INT64_MAX);
} }
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) { int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
...@@ -1517,10 +1517,10 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa ...@@ -1517,10 +1517,10 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa
if (delData.version > maxVer) { if (delData.version > maxVer) {
continue; continue;
} }
if (taosArrayPush(aDelData, &delData) == NULL) { if (taosArrayPush(aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
} }
ASSERT(n == size); ASSERT(n == size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册