提交 a9fcebbd 编写于 作者: H Hongze Cheng

more code

上级 d97f3493
......@@ -14,6 +14,10 @@
*/
#include "tsdb.h"
#include "tsdbDataFileRW.h"
#include "tsdbFS2.h"
#include "tsdbIter.h"
#include "tsdbSttFileRW.h"
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
......@@ -21,546 +25,483 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData,
// STsdbSnapReader ========================================
struct STsdbSnapReader {
STsdb* pTsdb;
int64_t sver;
int64_t ever;
int8_t type;
STsdb* tsdb;
int64_t sver;
int64_t ever;
int8_t type;
uint8_t* aBuf[5];
SSkmInfo skmTb[1];
TFileSetArray* fsetArr;
// context
struct {
int32_t fsetArrIdx;
STFileSet* fset;
bool isDataDone;
bool isTombDone;
} ctx[1];
STsdbFS fs;
TABLEID tbid;
SSkmInfo skmTable;
// timeseries data
int8_t dataDone;
int32_t fid;
SDataFReader* pDataFReader;
STsdbDataIter2* iterList;
STsdbDataIter2* pIter;
SRBTree rbt;
SBlockData bData;
// tombstone data
int8_t delDone;
SDelFReader* pDelFReader;
STsdbDataIter2* pTIter;
SArray* aDelData;
// reader
SDataFileReader* dataReader;
TSttFileReaderArray sttReaderArr[1];
// iter
TTsdbIterArray dataIterArr[1];
SIterMerger* dataIterMerger;
TTsdbIterArray tombIterArr[1];
SIterMerger* tombIterMerger;
// data
SBlockData blockData[1];
STombBlock tombBlock[1];
};
static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) {
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
if (pSet == NULL) {
pReader->fid = INT32_MAX;
goto _exit;
ASSERT(reader->dataReader == NULL);
ASSERT(TARRAY2_SIZE(reader->sttReaderArr) == 0);
// data
SDataFileReaderConfig config = {
.tsdb = reader->tsdb,
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
.bufArr = reader->aBuf,
};
bool hasDataFile = false;
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
if (reader->ctx->fset->farr[ftype] != NULL) {
hasDataFile = true;
config.files[ftype].exist = true;
config.files[ftype].file = reader->ctx->fset->farr[ftype]->f[0];
}
}
pReader->fid = pSet->fid;
if (hasDataFile) {
code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn);
// stt
SSttLvl* lvl;
TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader* sttReader;
SSttFileReaderConfig config = {
.tsdb = reader->tsdb,
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
.file = fobj->f[0],
.bufArr = reader->aBuf,
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->sttReaderArr, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader->dataReader);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(reader->dataIterMerger == NULL);
ASSERT(reader->tombIterMerger == NULL);
ASSERT(TARRAY2_SIZE(reader->dataIterArr) == 0);
ASSERT(TARRAY2_SIZE(reader->tombIterArr) == 0);
STsdbIter* iter;
STsdbIterConfig config = {
.filterByVersion = true,
.verRange[0] = reader->sver,
.verRange[1] = reader->ever,
};
// data file
if (reader->dataReader) {
// data
config.type = TSDB_ITER_TYPE_DATA;
config.dataReader = reader->dataReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pIter) {
// iter to next with filter info (sver, ever)
code = tsdbDataIterNext2(
pReader->pIter,
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag
.sver = pReader->sver,
.ever = pReader->ever});
code = TARRAY2_APPEND(reader->dataIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
// add to rbtree
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
// tomb
config.type = TSDB_ITER_TYPE_DATA_TOMB;
config.dataReader = reader->dataReader;
// add to iterList
pReader->pIter->next = pReader->iterList;
pReader->iterList = pReader->pIter;
} else {
tsdbCloseDataIter2(pReader->pIter);
}
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->tombIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter);
// stt file
SSttFileReader* sttReader;
TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
// data
config.type = TSDB_ITER_TYPE_STT;
config.sttReader = sttReader;
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pIter) {
// iter to valid row
code = tsdbDataIterNext2(
pReader->pIter,
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag
.sver = pReader->sver,
.ever = pReader->ever});
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->dataIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
// add to rbtree
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
// tomb
config.type = TSDB_ITER_TYPE_STT_TOMB;
config.sttReader = sttReader;
// add to iterList
pReader->pIter->next = pReader->iterList;
pReader->iterList = pReader->pIter;
} else {
tsdbCloseDataIter2(pReader->pIter);
}
}
code = tsdbIterOpen(&config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->tombIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pIter = NULL;
// merger
code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerOpen(reader->tombIterArr, &reader->dataIterMerger, true);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) {
while (pReader->iterList) {
STsdbDataIter2* pIter = pReader->iterList;
pReader->iterList = pIter->next;
tsdbCloseDataIter2(pIter);
}
tsdbDataFReaderClose(&pReader->pDataFReader);
static int32_t tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
tsdbIterMergerClose(&reader->dataIterMerger);
tsdbIterMergerClose(&reader->tombIterMerger);
TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
return 0;
}
static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
static int32_t tsdbSnapReadFileSetBegin(STsdbSnapReader* reader) {
int32_t code = 0;
int32_t lino = 0;
if (pReader->pIter) {
code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION |
TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag
.sver = pReader->sver,
.ever = pReader->ever});
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) {
reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++);
reader->ctx->isDataDone = false;
reader->ctx->isTombDone = false;
if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) {
pReader->pIter = NULL;
} else {
SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
if (pNode) {
int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode);
if (c > 0) {
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
pReader->pIter = NULL;
} else if (c == 0) {
ASSERT(0);
}
}
}
}
if (pReader->pIter == NULL) {
SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
if (pNode) {
tRBTreeDrop(&pReader->rbt, pNode);
pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
}
}
code = tsdbSnapReadFileSetOpenReader(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (ppRowInfo) {
if (pReader->pIter) {
*ppRowInfo = &pReader->pIter->rowInfo;
} else {
*ppRowInfo = NULL;
}
code = tsdbSnapReadFileSetOpenIter(reader);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
if (pReader->pIter) {
*ppRowInfo = &pReader->pIter->rowInfo;
return 0;
}
return tsdbSnapReadNextRow(pReader, ppRowInfo);
static int32_t tsdbSnapReadFileSetEnd(STsdbSnapReader* reader) {
tsdbSnapReadFileSetCloseIter(reader);
tsdbSnapReadFileSetCloseReader(reader);
reader->ctx->fset = NULL;
return 0;
}
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
ASSERT(pReader->bData.nRow);
int32_t lino = 0;
int32_t aBufN[5] = {0};
code = tCmprBlockData(&pReader->bData, NO_COMPRESSION, NULL, NULL, pReader->aBuf, aBufN);
if (code) goto _exit;
code = tCmprBlockData(reader->blockData, NO_COMPRESSION, NULL, NULL, reader->aBuf, aBufN);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
*data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*data == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
pHdr->type = pReader->type;
SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
pHdr->type = reader->type;
pHdr->size = size;
memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
memcpy(pHdr->data, reader->aBuf[3], aBufN[3]);
memcpy(pHdr->data + aBufN[3], reader->aBuf[2], aBufN[2]);
if (aBufN[1]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
memcpy(pHdr->data + aBufN[3] + aBufN[2], reader->aBuf[1], aBufN[1]);
}
if (aBufN[0]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], reader->aBuf[0], aBufN[0]);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) {
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pReader->pTsdb;
tBlockDataReset(reader->blockData);
tBlockDataReset(&pReader->bData);
for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
for (;;) {
// start a new file read if need
if (pReader->pDataFReader == NULL) {
code = tsdbSnapReadFileDataStart(pReader);
TABLEID tbid = {
.suid = row->suid,
.uid = row->suid ? 0 : row->uid,
};
code = tBlockDataInit(reader->blockData, &tbid, reader->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pReader->pDataFReader == NULL) break;
SRowInfo* pRowInfo;
code = tsdbSnapReadGetRow(pReader, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if (pRowInfo == NULL) {
tsdbSnapReadFileDataEnd(pReader);
continue;
if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
break;
}
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable);
code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0);
code = tsdbIterMergerNext(reader->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
do {
if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break;
if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) {
code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1));
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) {
pReader->bData.aUid[iRow] = pReader->bData.uid;
}
pReader->bData.uid = 0;
}
code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSnapReadNextRow(pReader, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->bData.nRow >= 81920) break;
} while (pRowInfo);
ASSERT(pReader->bData.nRow > 0);
break;
if (reader->blockData->nRow >= 81920) {
break;
}
}
if (pReader->bData.nRow > 0) {
ASSERT(pReader->bData.suid || pReader->bData.uid);
code = tsdbSnapCmprData(pReader, ppData);
if (reader->blockData->nRow > 0) {
ASSERT(reader->blockData->suid || reader->blockData->uid);
code = tsdbSnapCmprData(reader, data);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
int64_t size = sizeof(TABLEID);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData));
int64_t size = sizeof(SSnapDataHdr);
for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) {
size += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i);
}
uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (pData == NULL) {
data[0] = taosMemoryMalloc(size);
if (data[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
pHdr->type = SNAP_DATA_DEL;
pHdr->size = size;
SSnapDataHdr* hdr = (SSnapDataHdr*)data[0];
hdr->type = SNAP_DATA_DEL;
hdr->size = size;
TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr));
*pId = pReader->tbid;
size = sizeof(SSnapDataHdr) + sizeof(TABLEID);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData));
uint8_t* tdata = hdr->data;
for (int32_t i = 0; i < TARRAY_SIZE(reader->tombBlock->dataArr); i++) {
memcpy(tdata, TARRAY2_DATA(reader->tombBlock->dataArr + i), TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i));
tdata += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
*ppData = pData;
return code;
}
static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) {
*ppDelInfo = NULL;
} else {
*ppDelInfo = &pReader->pTIter->delInfo;
}
}
static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbDataIterNext2(
pReader->pTIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE,
.sver = pReader->sver,
.ever = pReader->ever});
TSDB_CHECK_CODE(code, lino, _exit);
if (ppDelInfo) {
tsdbSnapReadGetTombData(pReader, ppDelInfo);
}
tTombBlockClear(reader->tombBlock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pReader->pTsdb;
// open tombstone data iter if need
if (pReader->pDelFReader == NULL) {
if (pReader->fs.pDelFile == NULL) goto _exit;
// open
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb);
for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
code = tTombBlockPut(reader->tombBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pReader->pTIter) {
code = tsdbSnapReadNextTombData(pReader, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
break;
}
}
// loop to get tombstone data
SDelInfo* pDelInfo;
tsdbSnapReadGetTombData(pReader, &pDelInfo);
if (pDelInfo == NULL) goto _exit;
pReader->tbid = *(TABLEID*)pDelInfo;
if (pReader->aDelData) {
taosArrayClear(pReader->aDelData);
} else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbSnapReadNextTombData(pReader, &pDelInfo);
TSDB_CHECK_CODE(code, lino, _exit);
}
// encode tombstone data
if (taosArrayGetSize(pReader->aDelData) > 0) {
code = tsdbSnapCmprTombData(pReader, ppData);
if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
code = tsdbSnapCmprTombData(reader, data);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
}
return code;
}
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** reader) {
int32_t code = 0;
int32_t lino = 0;
// alloc
STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pReader->pTsdb = pTsdb;
pReader->sver = sver;
pReader->ever = ever;
pReader->type = type;
reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadRwlockRdlock(&pTsdb->rwLock);
code = tsdbFSRef(pTsdb, &pReader->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
// init
pReader->fid = INT32_MIN;
reader[0]->tsdb = tsdb;
reader[0]->sver = sver;
reader[0]->ever = ever;
reader[0]->type = type;
code = tBlockDataCreate(&pReader->bData);
code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode),
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
__func__, lino, tstrerror(code), sver, ever, type);
if (pReader) {
tBlockDataDestroy(&pReader->bData);
tsdbFSUnref(pTsdb, &pReader->fs);
taosMemoryFree(pReader);
pReader = NULL;
}
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
taosMemoryFree(reader[0]);
reader[0] = NULL;
} else {
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever,
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, sver, ever,
type);
}
*ppReader = pReader;
return code;
}
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
if (reader[0] == NULL) return 0;
int32_t code = 0;
int32_t lino = 0;
STsdbSnapReader* pReader = *ppReader;
STsdb* pTsdb = pReader->pTsdb;
STsdb* tsdb = reader[0]->tsdb;
// tombstone
if (pReader->pTIter) {
tsdbCloseDataIter2(pReader->pTIter);
pReader->pTIter = NULL;
}
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
taosArrayDestroy(pReader->aDelData);
tTombBlockDestroy(reader[0]->tombBlock);
tBlockDataDestroy(reader[0]->blockData);
// timeseries
while (pReader->iterList) {
STsdbDataIter2* pIter = pReader->iterList;
pReader->iterList = pIter->next;
tsdbCloseDataIter2(pIter);
}
if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader);
}
tBlockDataDestroy(&pReader->bData);
tsdbIterMergerClose(&reader[0]->dataIterMerger);
tsdbIterMergerClose(&reader[0]->tombIterMerger);
TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader[0]->dataReader);
// other
tDestroyTSchema(pReader->skmTable.pTSchema);
tsdbFSUnref(pReader->pTsdb, &pReader->fs);
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
tFree(pReader->aBuf[iBuf]);
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
tDestroyTSchema(reader[0]->skmTb->pTSchema);
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf);) {
tFree(reader[0]->aBuf[i]);
}
taosMemoryFree(pReader);
taosMemoryFree(reader[0]);
reader[0] = NULL;
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
}
*ppReader = NULL;
return code;
}
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
*ppData = NULL;
data[0] = NULL;
// read data file
if (!pReader->dataDone) {
code = tsdbSnapReadTimeSeriesData(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
pReader->dataDone = 1;
for (;;) {
if (reader->ctx->fset == NULL) {
code = tsdbSnapReadFileSetBegin(reader);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->ctx->fset == NULL) {
break;
}
}
}
// read del file
if (!pReader->delDone) {
code = tsdbSnapReadTombData(pReader, ppData);
TSDB_CHECK_CODE(code, lino, _exit);
if (*ppData) {
goto _exit;
} else {
pReader->delDone = 1;
if (!reader->ctx->isDataDone) {
code = tsdbSnapReadTimeSeriesData(reader, data);
TSDB_CHECK_CODE(code, lino, _exit);
if (data[0]) {
goto _exit;
} else {
reader->ctx->isDataDone = true;
}
}
if (!reader->ctx->isTombDone) {
code = tsdbSnapReadTombData(reader, data);
TSDB_CHECK_CODE(code, lino, _exit);
if (data[0]) {
goto _exit;
} else {
reader->ctx->isTombDone = true;
}
}
code = tsdbSnapReadFileSetEnd(reader);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
}
return code;
}
// STsdbSnapWriter ========================================
struct STsdbSnapWriter {
STsdb* pTsdb;
STsdb* tsdb;
int64_t sver;
int64_t ever;
int32_t minutes;
......@@ -571,41 +512,37 @@ struct STsdbSnapWriter {
int64_t commitID;
uint8_t* aBuf[5];
STsdbFS fs;
TABLEID tbid;
// time-series data
SBlockData inData;
int32_t fid;
SSkmInfo skmTable;
/* reader */
SDataFReader* pDataFReader;
STsdbDataIter2* iterList;
STsdbDataIter2* pDIter;
STsdbDataIter2* pSIter;
SRBTree rbt; // SRBTree<STsdbDataIter2>
/* writer */
SDataFWriter* pDataFWriter;
SArray* aBlockIdx;
SMapData mDataBlk; // SMapData<SDataBlk>
SArray* aSttBlk; // SArray<SSttBlk>
SBlockData bData;
SBlockData sData;
// tombstone data
/* reader */
SDelFReader* pDelFReader;
STsdbDataIter2* pTIter;
/* writer */
SDelFWriter* pDelFWriter;
SArray* aDelIdx;
SArray* aDelData;
TFileSetArray* fsetArr;
TFileOpArray fopArr[1];
struct {
bool fsetWriteBegin;
int32_t fid;
STFileSet* fset;
bool hasData;
bool hasTomb;
// reader
SDataFileReader* dataReader;
TSttFileReaderArray sttReaderArr[1];
// iter/merger
TTsdbIterArray dataIterArr[1];
SIterMerger* dataIterMerger;
TTsdbIterArray tombIterArr[1];
SIterMerger* tombIterMerger;
} ctx[1];
SDataFileWriter* dataWriter;
SSttFileWriter* sttWriter;
SBlockData blockData[1];
STombBlock tombBlock[1];
};
#if 0
// SNAP_DATA_TSDB
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
......@@ -666,7 +603,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
}
if (pId) {
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
code = tsdbUpdateTableSchema(pWriter->tsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pWriter->mDataBlk);
......@@ -690,9 +627,9 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__,
pWriter->tbid.suid, pWriter->tbid.uid);
}
return code;
......@@ -712,7 +649,7 @@ static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -782,7 +719,7 @@ _write_row:
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -832,7 +769,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -843,7 +780,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid);
STsdb* pTsdb = pWriter->pTsdb;
STsdb* pTsdb = pWriter->tsdb;
pWriter->fid = fid;
pWriter->tbid = (TABLEID){0};
......@@ -955,7 +892,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -1002,7 +939,7 @@ static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowIn
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -1021,7 +958,7 @@ static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInf
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -1076,9 +1013,9 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->tsdb->pVnode), __func__, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__);
tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->tsdb->pVnode), __func__);
}
return code;
}
......@@ -1139,9 +1076,9 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__,
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->tsdb->pVnode), __func__,
pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow);
}
return code;
......@@ -1196,9 +1133,9 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__,
pWriter->tbid.suid, pWriter->tbid.uid);
}
return code;
......@@ -1224,9 +1161,9 @@ static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
tsdbTrace("vgId:%d %s done", TD_VID(pWriter->tsdb->pVnode), __func__);
}
return code;
}
......@@ -1261,7 +1198,7 @@ static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId,
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -1270,7 +1207,7 @@ static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) {
int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pWriter->pTsdb;
STsdb* pTsdb = pWriter->tsdb;
SDelFile* pDelFile = pWriter->fs.pDelFile;
pWriter->tbid = (TABLEID){0};
......@@ -1310,7 +1247,7 @@ static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pWriter->pTsdb;
STsdb* pTsdb = pWriter->tsdb;
// end remaining table with NULL data
code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0);
......@@ -1352,7 +1289,7 @@ static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr
int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pWriter->pTsdb;
STsdb* pTsdb = pWriter->tsdb;
// start to write del data if need
if (pWriter->pDelFWriter == NULL) {
......@@ -1373,19 +1310,21 @@ _exit:
}
return code;
}
#endif
// APIs
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0;
int32_t lino = 0;
#if 0
// alloc
STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pWriter->pTsdb = pTsdb;
pWriter->tsdb = pTsdb;
pWriter->sver = sver;
pWriter->ever = ever;
pWriter->minutes = pTsdb->keepCfg.days;
......@@ -1411,29 +1350,31 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
TSDB_CHECK_CODE(code, lino, _exit);
// SNAP_DATA_DEL
#endif
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
if (pWriter) {
tBlockDataDestroy(&pWriter->sData);
tBlockDataDestroy(&pWriter->bData);
tBlockDataDestroy(&pWriter->inData);
tsdbFSDestroy(&pWriter->fs);
taosMemoryFree(pWriter);
pWriter = NULL;
}
// if (pWriter) {
// tBlockDataDestroy(&pWriter->sData);
// tBlockDataDestroy(&pWriter->bData);
// tBlockDataDestroy(&pWriter->inData);
// tsdbFSDestroy(&pWriter->fs);
// taosMemoryFree(pWriter);
// pWriter = NULL;
// }
} else {
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
}
*ppWriter = pWriter;
// *ppWriter = pWriter;
return code;
}
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer) {
int32_t code = 0;
int32_t lino = 0;
#if 0
if (pWriter->pDataFWriter) {
code = tsdbSnapWriteFileDataEnd(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -1444,32 +1385,34 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
code = tsdbFSPrepareCommit(pWriter->tsdb, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
}
return code;
}
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
STsdbSnapWriter* pWriter = *ppWriter;
STsdb* pTsdb = pWriter->pTsdb;
#if 0
STsdbSnapWriter* pWriter = *writer;
STsdb* pTsdb = pWriter->tsdb;
if (rollback) {
tsdbRollbackCommit(pWriter->pTsdb);
tsdbRollbackCommit(pWriter->tsdb);
} else {
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pWriter->pTsdb);
code = tsdbFSCommit(pWriter->tsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -1497,43 +1440,229 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
}
tsdbFSDestroy(&pWriter->fs);
taosMemoryFree(pWriter);
*ppWriter = NULL;
*writer = NULL;
#endif
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
// tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
// tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
static int32_t tsdbSnapWriteDoWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) {
int32_t code = 0;
int32_t lino = 0;
if (pHdr->type == SNAP_DATA_TSDB) {
code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr);
TSDB_CHECK_CODE(code, lino, _exit);
// TODO
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) {
int32_t code = 0;
int32_t lino = 0;
while (writer->ctx->hasData) {
SRowInfo* row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
if (row1 == NULL) {
writer->ctx->hasData = false;
break;
}
int32_t c = tRowInfoCmprFn(row1, row);
if (c <= 0) {
code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
break;
}
}
if (row->suid == INT64_MAX) {
ASSERT(writer->ctx->hasData == false);
goto _exit;
} else if (pWriter->pDataFWriter) {
code = tsdbSnapWriteFileDataEnd(pWriter);
}
code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
int32_t code = 0;
int32_t lino = 0;
// TODO
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
int32_t code = 0;
int32_t lino = 0;
// TODO
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
if (!writer->ctx->fsetWriteBegin) return 0;
int32_t code = 0;
int32_t lino = 0;
// TODO
SRowInfo row = {
.suid = INT64_MAX,
.uid = INT64_MAX,
};
code = tsdbSnapWriteTimeSeriesRow(writer, &row);
TSDB_CHECK_CODE(code, lino, _exit);
STombRecord record = {
.suid = INT64_MAX,
.uid = INT64_MAX,
};
code = tsdbSnapWriteTombRecord(writer, &record);
TSDB_CHECK_CODE(code, lino, _exit);
// close write
code = tsdbSttFileWriterClose(&writer->sttWriter, 0, writer->fopArr);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileWriterClose(&writer->dataWriter, 0, writer->fopArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
int32_t code = 0;
int32_t lino = 0;
SBlockData blockData[1] = {0};
code = tDecmprBlockData(hdr->data, hdr->size, blockData, writer->aBuf);
TSDB_CHECK_CODE(code, lino, _exit);
int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
if (fid != writer->ctx->fid) {
code = tsdbSnapWriteFileSetEnd(writer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSnapWriteFileSetBegin(writer, fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pHdr->type == SNAP_DATA_DEL) {
code = tsdbSnapWriteDelData(pWriter, pHdr);
for (int32_t i = 0; i < blockData->nRow; ++i) {
SRowInfo rowInfo = {
.suid = blockData->suid,
.uid = blockData->uid ? blockData->uid : blockData->aUid[i],
.row = tsdbRowFromBlockData(blockData, i),
};
code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
} else {
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
blockData->suid, blockData->uid, blockData->nRow);
}
tBlockDataDestroy(blockData);
return code;
}
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
int32_t code = 0;
int32_t lino = 0;
// TODO
_exit:
return code;
}
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
int32_t code = 0;
int32_t lino = 0;
STombBlock tombBlock[1] = {0};
code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
STombRecord record;
tTombBlockGet(tombBlock, i, &record);
code = tsdbSnapWriteTombRecord(writer, &record);
TSDB_CHECK_CODE(code, lino, _exit);
}
tTombBlockDestroy(tombBlock);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
int32_t code = 0;
int32_t lino = 0;
if (hdr->type == SNAP_DATA_TSDB) {
code = tsdbSnapWriteTimeSeriesData(writer, hdr);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (hdr->type == SNAP_DATA_DEL) {
code = tsdbSnapWriteTombData(writer, hdr);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(0);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size);
TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
} else {
tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
pHdr->type, pHdr->index, pHdr->size);
tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
hdr->type, hdr->index, hdr->size);
}
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册