未验证 提交 4e189410 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14990 from taosdata/feat/tsdb_snapshot

feat(tsdb/sync): tsdb snapshot/sync integration
......@@ -332,7 +332,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_READ_END TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -24,12 +24,12 @@ extern "C" {
// tsdbDebug ================
// clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct TSDBROW TSDBROW;
......@@ -115,7 +115,6 @@ int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock);
// SBlockIdx
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
......@@ -126,6 +125,8 @@ void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tPutColData(uint8_t *p, SColData *pColData);
int32_t tGetColData(uint8_t *p, SColData *pColData);
// SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
......@@ -134,14 +135,17 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
......@@ -202,7 +206,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
......@@ -357,10 +361,6 @@ struct TSDBROW {
struct SBlockIdx {
int64_t suid;
int64_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset;
int64_t size;
};
......
......@@ -309,6 +309,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
struct SSnapDataHdr {
int8_t type;
int64_t index;
int64_t size;
uint8_t data[];
};
......
......@@ -26,60 +26,72 @@ struct SMetaSnapReader {
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0;
int32_t c = 0;
SMetaSnapReader* pMetaSnapReader = NULL;
SMetaSnapReader* pReader = NULL;
// alloc
pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader));
if (pMetaSnapReader == NULL) {
pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMetaSnapReader->pMeta = pMeta;
pMetaSnapReader->sver = sver;
pMetaSnapReader->ever = ever;
pReader->pMeta = pMeta;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL);
code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
*ppReader = pMetaSnapReader;
metaInfo("vgId:%d vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
*ppReader = pReader;
return code;
_err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
metaError("vgId:%d vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pTbc);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return 0;
return code;
}
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pData = NULL;
int32_t nKey = 0;
int32_t nData = 0;
int32_t code = 0;
STbDbKey key;
*ppData = NULL;
for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData);
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
code = TSDB_CODE_VND_READ_END;
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
goto _exit;
}
if (((STbDbKey*)pData)->version < pReader->sver) {
key = ((STbDbKey*)pKey)[0];
if (key.version > pReader->ever) {
goto _exit;
}
if (key.version < pReader->sver) {
tdbTbcMoveToNext(pReader->pTbc);
continue;
}
......@@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
break;
}
// copy the data
if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
ASSERT(pData && nData);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
goto _err;
}
((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
((SSnapDataHdr*)(*ppData))->size = nData;
memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 0; // TODO: use macro
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit:
return code;
_err:
metaError("vgId:%d vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
return code;
}
// SMetaSnapWriter ========================================
......@@ -108,18 +131,6 @@ struct SMetaSnapWriter {
int64_t ever;
};
static int32_t metaSnapRollback(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t metaSnapCommit(SMetaSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
int32_t code = 0;
SMetaSnapWriter* pWriter;
......@@ -148,10 +159,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) {
code = metaSnapRollback(pWriter);
if (code) goto _err;
ASSERT(0);
} else {
code = metaSnapCommit(pWriter);
code = metaCommit(pWriter->pMeta);
if (code) goto _err;
}
taosMemoryFree(pWriter);
......@@ -170,15 +180,16 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
SMetaEntry metaEntry = {0};
SDecoder* pDecoder = &(SDecoder){0};
tDecoderInit(pDecoder, pData, nData);
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
metaDecodeEntry(pDecoder, &metaEntry);
code = metaHandleEntry(pMeta, &metaEntry);
if (code) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
metaError("vgId:%d meta snapshot write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
metaError("vgId:%d vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
......@@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else {
// tBlockDataClear(&state->blockData);
// tBlockDataClear(&state->blockData, 1);
if (state->pBlockData) {
tBlockDataClear(state->pBlockData);
tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL;
}
......@@ -500,9 +500,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err;
/* if (state->pBlockIdx) { */
/* tBlockIdxReset(state->blockIdx); */
/* } */
/* tBlockIdxReset(state->blockIdx); */
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
* &state->blockIdx);
*/
......@@ -582,8 +580,8 @@ _err:
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL;
}
......@@ -609,8 +607,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL;
}
if (state->pBlockData) {
// tBlockDataClear(&state->blockData);
tBlockDataClear(state->pBlockData);
// tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL;
}
......
......@@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
......@@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
.fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = pRSet->fSma};
} else {
STfs *pTfs = pTsdb->pVnode->pTfs;
SDiskID did = {.level = 0, .id = 0};
// TODO: alloc a new disk
// tfsAllocDisk(pTfs, 0, &did);
// create the directory
tfsMkdirRecurAt(pTfs, pTsdb->path, did);
wSet = (SDFileSet){.diskId = did,
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
.fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pCommitter->commitID, .size = 0},
......@@ -1001,10 +992,10 @@ _exit:
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData);
tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData);
tBlockDataClear(&pCommitter->nBlockData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
}
......
......@@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) {
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag) {
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, flag);
}
......@@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataClear(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData, 1);
terrno = code;
return NULL;
}
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
tBlockDataClear(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData, 1);
return pReader->pResBlock->pDataBlock;
}
......@@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
hasNext = (pBlockIter->numOfBlocks > 0);
}
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
return code;
......@@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS;
}
......@@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
goto _err;
}
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
goto _err;
}
}
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
}
tFree(pBuf1);
......@@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
goto _err;
}
code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
goto _err;
}
// merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) {
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
goto _err;
}
}
tBlockDataClear(pBlockData1);
tBlockDataClear(pBlockData2);
tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1);
}
ASSERT(pBlock->nRow == pBlockData->nRow);
......
......@@ -29,29 +29,27 @@ struct STsdbSnapReader {
SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlockData blkData;
SBlockData oBlockData;
SBlockData nBlockData;
// for del file
int8_t delDone;
SDelFReader* pDelFReader;
int32_t iDelIdx;
SArray* aDelIdx; // SArray<SDelIdx>
int32_t iDelIdx;
SArray* aDelData; // SArray<SDelData>
};
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) {
if (pReader->pDataFReader == NULL) {
SDFileSet* pSet = NULL;
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT);
// search the next data file set to read (todo)
if (0 /* TODO */) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
}
if (pSet == NULL) goto _exit;
// open
pReader->fid = pSet->fid;
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
if (code) goto _err;
......@@ -61,6 +59,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iBlockIdx = 0;
pReader->pBlockIdx = NULL;
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid);
}
while (true) {
......@@ -73,17 +73,15 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
pReader->iBlockIdx++;
// SBlock
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
if (code) goto _err;
pReader->iBlock = 0;
}
while (true) {
SBlock block;
SBlock* pBlock = &block;
while (true) {
if (pReader->iBlock >= pReader->mBlock.nItem) {
pReader->pBlockIdx = NULL;
break;
......@@ -92,15 +90,60 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
pReader->iBlock++;
if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) ||
(pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) {
// overlap (todo)
if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue;
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL);
code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL);
if (code) goto _err;
goto _exit;
// filter
tBlockDataReset(&pReader->nBlockData);
for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) {
SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData);
SColData* pColDataN = NULL;
code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN);
if (code) goto _err;
tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn);
}
for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
int64_t version = TSDBROW_VERSION(&row);
if (version < pReader->sver || version > pReader->ever) continue;
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
if (code) goto _err;
}
// org data
// compress data (todo)
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 1;
pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]);
pId->suid = pReader->pBlockIdx->suid;
pId->uid = pReader->pBlockIdx->uid;
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
size);
goto _exit;
}
}
}
......@@ -109,7 +152,7 @@ _exit:
return code;
_err:
tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -120,7 +163,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
if (pReader->pDelFReader == NULL) {
if (pDelFile == NULL) {
code = TSDB_CODE_VND_READ_END;
goto _exit;
}
......@@ -135,15 +177,20 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iDelIdx = 0;
}
while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
while (true) {
if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
tsdbDelFReaderClose(&pReader->pDelFReader);
break;
}
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
int32_t size = 0;
pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
if (code) goto _err;
int32_t size = 0;
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
......@@ -152,46 +199,44 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
}
}
if (size > 0) {
int64_t n = 0;
if (size == 0) continue;
size = size + sizeof(SSnapDataHdr) + sizeof(TABLEID);
code = tRealloc(ppData, size);
if (code) goto _err;
// org data
size = sizeof(TABLEID) + size;
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// SSnapDataHdr
SSnapDataHdr* pSnapDataHdr = (SSnapDataHdr*)(*ppData + n);
pSnapDataHdr->type = 1;
pSnapDataHdr->size = size; // TODO: size here may incorrect
n += sizeof(SSnapDataHdr);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 2;
pHdr->size = size;
// TABLEID
TABLEID* pId = (TABLEID*)(*ppData + n);
TABLEID* pId = (TABLEID*)(&pHdr[1]);
pId->suid = pDelIdx->suid;
pId->uid = pDelIdx->uid;
n += sizeof(*pId);
// DATA
int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
n += tPutDelData(*ppData + n, pDelData);
}
}
if (pDelData->version < pReader->sver) continue;
if (pDelData->version > pReader->ever) continue;
goto _exit;
}
n += tPutDelData((*ppData) + n, pDelData);
}
code = TSDB_CODE_VND_READ_END;
tsdbDelFReaderClose(&pReader->pDelFReader);
tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size);
break;
}
_exit:
return code;
_err:
tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -209,15 +254,16 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
pReader->sver = sver;
pReader->ever = ever;
pReader->fid = INT32_MIN;
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pReader->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->mBlock = tMapDataInit();
code = tBlockDataInit(&pReader->blkData);
code = tBlockDataInit(&pReader->oBlockData);
if (code) goto _err;
code = tBlockDataInit(&pReader->nBlockData);
if (code) goto _err;
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
......@@ -225,18 +271,18 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pReader->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode));
*ppReader = pReader;
return code;
_err:
tsdbError("vgId:%d snapshot reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
......@@ -245,37 +291,43 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0;
STsdbSnapReader* pReader = *ppReader;
taosArrayDestroy(pReader->aDelData);
taosArrayDestroy(pReader->aDelIdx);
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
tBlockDataClear(&pReader->blkData);
tMapDataClear(&pReader->mBlock);
taosArrayDestroy(pReader->aBlockIdx);
if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader);
}
taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock);
tBlockDataClear(&pReader->oBlockData, 1);
tBlockDataClear(&pReader->nBlockData, 1);
if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
taosArrayDestroy(pReader->aDelIdx);
taosArrayDestroy(pReader->aDelData);
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode));
taosMemoryFree(pReader);
*ppReader = NULL;
return code;
}
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
*ppData = NULL;
// read data file
if (!pReader->dataDone) {
code = tsdbSnapReadData(pReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->dataDone = 1;
} else {
goto _err;
}
} else {
if (*ppData) {
goto _exit;
} else {
pReader->dataDone = 1;
}
}
}
......@@ -283,23 +335,21 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
if (!pReader->delDone) {
code = tsdbSnapReadDel(pReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->delDone = 1;
} else {
goto _err;
}
} else {
if (*ppData) {
goto _exit;
} else {
pReader->delDone = 1;
}
}
}
code = TSDB_CODE_VND_READ_END;
_exit:
return code;
_err:
tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -315,246 +365,540 @@ struct STsdbSnapWriter {
int32_t minRow;
int32_t maxRow;
int8_t cmprAlg;
int64_t commitID;
// for data file
SBlockData bData;
int32_t fid;
SDataFReader* pDataFReader;
SArray* aBlockIdx;
SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
SMapData mBlock; // SMapData<SBlock>
int32_t iBlock;
SBlock* pBlock;
SBlock block;
SBlockData blockData;
SBlockData* pBlockData;
int32_t iRow;
SBlockData bDataR;
SDataFWriter* pDataFWriter;
SArray* aBlockIdxN;
SBlockIdx* pBlockIdxN;
SBlockIdx blockIdx;
SMapData mBlockN;
SBlock* pBlockN;
SBlock blockN;
SBlockData nBlockData;
SBlockIdx* pBlockIdxW; // NULL when no committing table
SBlock blockW;
SBlockData bDataW;
SBlockIdx blockIdxW;
SMapData mBlockW; // SMapData<SBlock>
SArray* aBlockIdxW; // SArray<SBlockIdx>
// for del file
SDelFReader* pDelFReader;
SDelFWriter* pDelFWriter;
int32_t iDelIdx;
SArray* aDelIdx;
SArray* aDelIdxR;
SArray* aDelData;
SArray* aDelIdxN;
SArray* aDelIdxW;
};
static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) {
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
// TODO
int32_t iRow = 0; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
while (iRow < nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
if (code) goto _err;
}
return code;
}
static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
_err:
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit;
ASSERT(pWriter->pDataFWriter);
// TODO
if (pWriter->pBlockIdxW == NULL) goto _exit;
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0);
// consume remain rows
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
while (pWriter->iRow < pWriter->pBlockData->nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
pWriter->iRow++;
}
}
// write remain data if has
if (pWriter->bDataW.nRow > 0) {
pWriter->blockW.last = 0;
if (pWriter->bDataW.nRow < pWriter->minRow) {
if (pWriter->iBlock > pWriter->mBlock.nItem) {
pWriter->blockW.last = 1;
}
}
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
}
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
pWriter->cmprAlg);
if (code) goto _err;
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
pWriter->iBlock++;
}
// SBlock
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
if (code) goto _err;
// SBlockIdx
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
int32_t code = 0;
int32_t iRow = 0; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
while (iRow < nRow) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
// SBlockData
SBlock block;
tMapDataReset(&pWriter->mBlockW);
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
if (block.last) {
code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
block.last = 1;
code =
tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg);
if (code) goto _err;
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
}
// SBlock
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
if (code) goto _err;
// SBlockIdx
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWrite) {
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
SBlockData* pBlockData = &pWriter->bData;
int32_t iRow = 0;
TSDBROW row;
TSDBROW* pRow = &row;
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
TABLEID id = {0}; // TODO
// correct schema
code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
if (code) goto _err;
// skip
while (pWriter->pBlockIdx && tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
code = tsdbSnapWriteTableDataEnd(pWriter);
// loop to merge
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
while (true) {
if (pRow == NULL) break;
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow));
ASSERT(c);
if (c < 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
pWriter->iBlockIdx++;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
iRow++;
if (iRow < pWriter->pBlockData->nRow) {
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
pWriter->pBlockIdx = NULL;
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
if (code) goto _err;
pWriter->iRow++;
if (pWriter->iRow >= pWriter->pBlockData->nRow) {
pWriter->pBlockData = NULL;
}
}
} else {
TSDBKEY key = TSDBROW_KEY(pRow);
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
// new or merge
if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) {
SBlock block;
int32_t c;
if (pWriter->pBlockIdxN && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxN)) != 0)) {
ASSERT(c > 0);
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (block.last) {
pWriter->pBlockData = &pWriter->bDataR;
code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
break;
}
if (pWriter->pBlockIdxN == NULL) {
pWriter->pBlockIdx = &pWriter->blockIdx;
pWriter->pBlockIdx->suid = id.suid;
pWriter->pBlockIdx->uid = id.uid;
c = tsdbKeyCmprFn(&block.maxKey, &key);
ASSERT(c);
if (c < 0) {
if (pWriter->bDataW.nRow) {
pWriter->blockW.last = 0;
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
// loop to write the data
TSDBROW* pRow = NULL; // todo
int32_t nRow = 0; // todo
SBlockData* pBlockData = NULL; // todo
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
if (code) goto _err;
if (pWriter->nBlockData.nRow > pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
pWriter->iBlock++;
} else {
c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey);
ASSERT(c);
if (c > 0) {
pWriter->pBlockData = &pWriter->bDataR;
code =
tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
}
break;
}
}
if (pWriter->pBlockData) continue;
code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
if (code) goto _err;
iRow++;
if (iRow < pBlockData->nRow) {
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
// skip
pRow = NULL;
}
}
_check_write:
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
_write_block:
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW,
pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
}
return code;
_err:
tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// end last table write if should
if (pWriter->pBlockIdxW) {
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
if (c < 0) {
// end
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) {
ASSERT(0);
}
}
// start new table data write if need
if (pWriter->pBlockIdxW == NULL) {
// write table data ahead
while (true) {
if (pWriter->pBlock == NULL) break;
if (pWriter->pBlock->last) break;
if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break;
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;
code = tMapDataPutItem(&pWriter->mBlockN, pWriter->pBlock, tPutBlock);
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
if (c >= 0) break;
code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx);
if (code) goto _err;
pWriter->iBlockIdx++;
}
if (pWriter->pBlock) {
if (pWriter->pBlock->last) {
// load the last block and merge with the data (todo)
} else {
int32_t c = tBlockCmprFn(&(SBlock){0 /*TODO*/}, pWriter->pBlock);
// reader
pWriter->pBlockIdx = NULL;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
ASSERT(pWriter->pDataFReader);
if (c > 0) {
// commit until pWriter->pBlock (todo)
} else {
// load the block and merge with the data (todo)
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
ASSERT(c >= 0);
if (c == 0) {
pWriter->pBlockIdx = pBlockIdx;
pWriter->iBlockIdx++;
}
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
if (code) goto _err;
} else {
int32_t nRow = 0;
SBlockData* pBlockData = NULL;
tMapDataReset(&pWriter->mBlock);
}
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
// writer
pWriter->pBlockIdxW = &pWriter->blockIdxW;
pWriter->pBlockIdxW->suid = id.suid;
pWriter->pBlockIdxW->uid = id.uid;
tBlockReset(&pWriter->blockW);
tBlockDataReset(&pWriter->bDataW);
tMapDataReset(&pWriter->mBlockW);
}
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));
code = tsdbSnapWriteTableDataImpl(pWriter);
if (code) goto _err;
_exit:
return code;
_err:
tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code));
return code;
}
for (int32_t iRow = 0; iRow < nRow; iRow++) {
code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit;
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err;
if (pWriter->nBlockData.nRow >= pWriter->maxRow * 4 / 5) {
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN,
pWriter->pBlockN, pWriter->cmprAlg);
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
if (code) goto _err;
tBlockDataClearData(&pWriter->nBlockData);
}
}
pWriter->iBlockIdx++;
}
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
if (code) goto _err;
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter));
if (code) goto _err;
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err;
}
_exit:
tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write table data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
int64_t skey; // todo
int64_t ekey; // todo
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
// decode
SBlockData* pBlockData = &pWriter->bData;
n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
// begin
// open file
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
// end last file data write if need
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
pWriter->fid = fid;
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid);
// reader
// read
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
if (pSet) {
// open
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx);
}
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
tBlockDataReset(&pWriter->bDataR);
// writer
SDFileSet wSet = {0};
if (pSet == NULL) {
wSet = (SDFileSet){0}; // todo
// write
SDFileSet wSet;
if (pSet) {
wSet = (SDFileSet){.diskId = pSet->diskId,
.fid = fid,
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
.fData = pSet->fData,
.fLast = {.commitID = pWriter->commitID, .size = 0},
.fSma = pSet->fSma};
} else {
wSet = (SDFileSet){0}; // todo
wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0},
.fid = fid,
.fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pWriter->commitID, .size = 0},
.fLast = {.commitID = pWriter->commitID, .size = 0},
.fSma = {.commitID = pWriter->commitID, .size = 0}};
}
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxN);
taosArrayClear(pWriter->aBlockIdxW);
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW);
}
code = tsdbSnapWriteTableData(pWriter, pData, nData);
code = tsdbSnapWriteTableData(pWriter, id);
if (code) goto _err;
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow);
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -570,28 +914,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL);
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
if (code) goto _err;
}
// writer
SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0};
SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
if (code) goto _err;
}
// process the del data
TABLEID id = {0}; // todo
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
while (true) {
SDelIdx* pDelIdx = NULL;
int64_t n = 0;
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
SDelData delData;
SDelIdx delIdx;
int8_t toBreak = 0;
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) {
pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
}
if (pDelIdx) {
......@@ -632,7 +976,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) {
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -644,7 +988,7 @@ _exit:
return code;
_err:
tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -653,8 +997,9 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx);
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
if (code) goto _err;
......@@ -663,7 +1008,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) {
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -684,10 +1029,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
}
_exit:
tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode));
return code;
_err:
tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -705,6 +1051,54 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->sver = sver;
pWriter->ever = ever;
// config
pWriter->minutes = pTsdb->keepCfg.days;
pWriter->precision = pTsdb->keepCfg.precision;
pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file
code = tBlockDataInit(&pWriter->bData);
if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataR);
if (code) goto _err;
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tBlockDataInit(&pWriter->bDataW);
if (code) goto _err;
// for del file
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pWriter->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbFSBegin(pTsdb->fs);
if (code) goto _err;
*ppWriter = pWriter;
return code;
......@@ -719,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
STsdbSnapWriter* pWriter = *ppWriter;
if (rollback) {
code = tsdbSnapRollback(pWriter);
code = tsdbFSRollback(pWriter->pTsdb->fs);
if (code) goto _err;
} else {
code = tsdbSnapWriteDataEnd(pWriter);
......@@ -728,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err;
code = tsdbSnapCommit(pWriter);
code = tsdbFSCommit(pWriter->pTsdb->fs);
if (code) goto _err;
}
......@@ -738,29 +1132,35 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
return code;
_err:
tsdbError("vgId:%d tsdb snapshot writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code));
return code;
}
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
int8_t type = pData[0];
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// ts data
if (type == 0) {
code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1);
if (pHdr->type == 1) {
code = tsdbSnapWriteData(pWriter, pData, nData);
if (code) goto _err;
goto _exit;
} else {
if (pWriter->pDataFWriter) {
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
}
}
// del data
if (type == 1) {
code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1);
if (pHdr->type == 2) {
code = tsdbSnapWriteDel(pWriter, pData, nData);
if (code) goto _err;
}
_exit:
return code;
_err:
......
......@@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
// alloc
code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
if (code) goto _err;
if (code) goto _exit;
code = tRealloc(&pMapData->pData, pMapData->nData);
if (code) goto _err;
if (code) goto _exit;
// put
pMapData->aOffset[nItem] = offset;
tPutItemFn(pMapData->pData + offset, pItem);
_err:
_exit:
return code;
}
......@@ -189,25 +189,12 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
}
// SBlockIdx ======================================================
void tBlockIdxReset(SBlockIdx *pBlockIdx) {
pBlockIdx->minKey = TSKEY_MAX;
pBlockIdx->maxKey = TSKEY_MIN;
pBlockIdx->minVersion = VERSION_MAX;
pBlockIdx->maxVersion = VERSION_MIN;
pBlockIdx->offset = -1;
pBlockIdx->size = -1;
}
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutI64(p ? p + n : p, pBlockIdx->minKey);
n += tPutI64(p ? p + n : p, pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size);
......@@ -220,10 +207,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetI64(p + n, &pBlockIdx->minKey);
n += tGetI64(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size);
......@@ -921,6 +904,76 @@ _exit:
return code;
}
int32_t tPutColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tPutI16v(p ? p + n : p, pColData->cid);
n += tPutI8(p ? p + n : p, pColData->type);
n += tPutI8(p ? p + n : p, pColData->smaOn);
n += tPutI32v(p ? p + n : p, pColData->nVal);
n += tPutU8(p ? p + n : p, pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
if (p) {
memcpy(p + n, pColData->pBitMap, size);
}
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
if (p) {
memcpy(p + n, pColData->aOffset, size);
}
n += size;
}
n += tPutI32v(p ? p + n : p, pColData->nData);
if (p) {
memcpy(p + n, pColData->pData, pColData->nData);
}
n += pColData->nData;
_exit:
return n;
}
int32_t tGetColData(uint8_t *p, SColData *pColData) {
int32_t n = 0;
n += tGetI16v(p + n, &pColData->cid);
n += tGetI8(p + n, &pColData->type);
n += tGetI8(p + n, &pColData->smaOn);
n += tGetI32v(p + n, &pColData->nVal);
n += tGetU8(p + n, &pColData->flag);
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
if (pColData->flag != HAS_VALUE) {
// bitmap
int32_t size = BIT2_SIZE(pColData->nVal);
pColData->pBitMap = p + n;
n += size;
}
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
int32_t size = sizeof(int32_t) * pColData->nVal;
pColData->aOffset = (int32_t *)(p + n);
n += size;
}
n += tGetI32v(p + n, &pColData->nData);
pColData->pData = p + n;
n += pColData->nData;
_exit:
return n;
}
static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
SColData *pColData1 = (SColData *)p1;
SColData *pColData2 = (SColData *)p2;
......@@ -962,11 +1015,11 @@ void tBlockDataReset(SBlockData *pBlockData) {
taosArrayClear(pBlockData->aIdx);
}
void tBlockDataClear(SBlockData *pBlockData) {
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
}
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
......@@ -1079,6 +1132,46 @@ _err:
return code;
}
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < taosArrayGetSize(pBlockData->aIdx)) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _exit;
}
iColData++;
break;
} else if (pColData->cid == pColDataFrom->cid) {
iColData++;
break;
} else {
iColData++;
}
}
}
_exit:
return code;
}
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) {
int32_t code = 0;
......@@ -1239,6 +1332,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
*ppColData = NULL;
}
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
n += tPutI32v(p ? p + n : p, pBlockData->nRow);
if (p) {
memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow);
}
n = n + sizeof(int64_t) * pBlockData->nRow;
if (p) {
memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow);
}
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol = taosArrayGetSize(pBlockData->aIdx);
n += tPutI32v(p ? p + n : p, nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
n += tPutColData(p ? p + n : p, pColData);
}
return n;
}
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
int32_t n = 0;
tBlockDataReset(pBlockData);
n += tGetI32v(p + n, &pBlockData->nRow);
pBlockData->aVersion = (int64_t *)(p + n);
n = n + sizeof(int64_t) * pBlockData->nRow;
pBlockData->aTSKEY = (TSKEY *)(p + n);
n = n + sizeof(TSKEY) * pBlockData->nRow;
int32_t nCol;
n += tGetI32v(p + n, &nCol);
for (int32_t iCol = 0; iCol < nCol; iCol++) {
SColData *pColData;
if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1;
n += tGetColData(p + n, pColData);
}
return n;
}
// ALGORITHM ==============================
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
......
......@@ -20,13 +20,13 @@ struct SVSnapReader {
SVnode *pVnode;
int64_t sver;
int64_t ever;
int64_t index;
// meta
int8_t metaDone;
SMetaSnapReader *pMetaReader;
// tsdb
int8_t tsdbDone;
STsdbSnapReader *pTsdbReader;
uint8_t *pData;
};
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
......@@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
pReader->sver = sver;
pReader->ever = ever;
code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader);
if (code) goto _err;
code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
if (code) goto _err;
vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
*ppReader = pReader;
return code;
......@@ -60,54 +55,85 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0;
tFree(pReader->pData);
if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader);
taosMemoryFree(pReader);
if (pReader->pTsdbReader) {
tsdbSnapReaderClose(&pReader->pTsdbReader);
}
if (pReader->pMetaReader) {
metaSnapReaderClose(&pReader->pMetaReader);
}
vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode));
taosMemoryFree(pReader);
return code;
}
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
// META ==============
if (!pReader->metaDone) {
code = metaSnapRead(pReader->pMetaReader, &pReader->pData);
// open reader if not
if (pReader->pMetaReader == NULL) {
code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
if (code) goto _err;
}
code = metaSnapRead(pReader->pMetaReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->metaDone = 1;
} else {
goto _err;
}
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
if (*ppData) {
goto _exit;
} else {
pReader->metaDone = 1;
code = metaSnapReaderClose(&pReader->pMetaReader);
if (code) goto _err;
}
}
}
// TSDB ==============
if (!pReader->tsdbDone) {
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData);
// open if not
if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader);
if (code) goto _err;
}
code = tsdbSnapRead(pReader->pTsdbReader, ppData);
if (code) {
if (code == TSDB_CODE_VND_READ_END) {
pReader->tsdbDone = 1;
} else {
goto _err;
}
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
if (*ppData) {
goto _exit;
} else {
pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
}
}
}
code = TSDB_CODE_VND_READ_END;
*ppData = NULL;
*nData = 0;
_exit:
if (*ppData) {
SSnapDataHdr *pHdr = (SSnapDataHdr *)(*ppData);
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
} else {
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
}
return code;
_err:
vError("vgId:% snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
return code;
}
......@@ -116,24 +142,13 @@ struct SVSnapWriter {
SVnode *pVnode;
int64_t sver;
int64_t ever;
int64_t index;
// meta
SMetaSnapWriter *pMetaSnapWriter;
// tsdb
STsdbSnapWriter *pTsdbSnapWriter;
};
static int32_t vnodeSnapRollback(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t vnodeSnapCommit(SVSnapWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
int32_t code = 0;
SVSnapWriter *pWriter = NULL;
......@@ -148,62 +163,78 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver;
pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
*ppWriter = pWriter;
return code;
_err:
vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0;
if (rollback) {
code = vnodeSnapRollback(pWriter);
if (pWriter->pMetaSnapWriter) {
code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
if (code) goto _err;
} else {
code = vnodeSnapCommit(pWriter);
}
if (pWriter->pTsdbSnapWriter) {
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
if (code) goto _err;
}
_exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
taosMemoryFree(pWriter);
return code;
_err:
vError("vgId:%d vnode snapshow writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
vError("vgId:%d vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
return code;
}
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0;
SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode;
ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->index == pWriter->index + 1);
pWriter->index = pHdr->index;
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData);
if (pSnapDataHdr->type == 0) {
if (pHdr->type == 0) {
// meta
if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err;
}
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err;
} else {
// tsdb
if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err;
}
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err;
}
_exit:
return code;
_err:
vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code));
vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
tstrerror(code), pHdr->index, pHdr->type, nData);
return code;
}
\ No newline at end of file
......@@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
......
......@@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeEventLog(ths, logBuf);
} while (0);
// maybe update commit index by snapshot
syncNodeMaybeUpdateCommitBySnapshot(ths);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
......@@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->matchIndex = ths->commitIndex;
// msg event log
do {
......
......@@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
pMsg->matchIndex, pMsg->success);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
......@@ -238,7 +247,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg);
SyncIndex endIndex;
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex + 1)) {
endIndex = nextIndex;
} else {
endIndex = oldSnapshot.lastApplyIndex;
}
syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, endIndex, newSnapshotTerm, pMsg);
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
......@@ -256,6 +272,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
}
// event log, update next-index
do {
char host[64];
......
......@@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
return pSyncNode;
}
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot;
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
pSyncNode->commitIndex = snapshot.lastApplyIndex;
}
}
}
void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
......
......@@ -336,8 +336,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_READ_END, "Read end")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
......@@ -170,84 +170,3 @@ if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
#system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 7000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
#system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
########################################################
print ===> start dnode1 dnode2 dnode3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
#system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册