diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index faee6cc2facc9ddcc4cdcabf2c8b554ac9a673b7..1fb82dcaa04c4835c1ab4422baff701debd21139 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1713,8 +1713,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; - printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, - pDataBlock->info.childId, pDataBlock->info.groupId); + printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag, + pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, + pDataBlock->info.groupId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); for (int32_t k = 0; k < numOfCols; k++) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index e9e20912c52666d71af546b53e6a946cb53b6833..3d8d46a0fbdf39b40b3ac74e37e0d7c1b762d806 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -31,6 +31,7 @@ target_sources( "src/sma/smaOpen.c" "src/sma/smaCommit.c" "src/sma/smaRollup.c" + "src/sma/smaSnapshot.c" "src/sma/smaTimeRange.c" # tsdb diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 217d40e3aa9ee990f70a9b984cb9fde3beeb089a..c825ab673101b22cab9b36df479f41fac0120200 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -209,6 +209,9 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, // smaFileUtil ================ +typedef struct SQTaskFReader SQTaskFReader; +typedef struct SQTaskFWriter SQTaskFWriter; + #define TD_FILE_HEAD_SIZE 512 typedef struct STFInfo STFInfo; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9ed2b25fdf203507e46cb8a79f8c3340bcba5c4f..b1da5a788302179a5e7793ee561ccd3197204f2c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -62,6 +62,8 @@ typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapWriter STsdbSnapWriter; +typedef struct SRsmaSnapReader SRsmaSnapReader; +typedef struct SRsmaSnapWriter SRsmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; #define VNODE_META_DIR "meta" @@ -196,13 +198,21 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback); // STsdbSnapReader ======================================== -int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader); +int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader); int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); +// SRsmaSnapReader ======================================== +int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader); +int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader); +int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData); +// SRsmaSnapWriter ======================================== +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter); +int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback); typedef struct { int8_t streamType; // sma or other @@ -314,6 +324,15 @@ struct SSma { // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); +enum { + SNAP_DATA_META = 0, + SNAP_DATA_TSDB = 1, + SNAP_DATA_DEL = 2, + SNAP_DATA_RSMA1 = 3, + SNAP_DATA_RSMA2 = 4, + SNAP_DATA_QTASK = 5, +}; + struct SSnapDataHdr { int8_t type; int64_t index; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 46609cf5613c91e1d65eecc257c43f33a733aa76..bb20a1a7ffa23872fb26a2752d1ff6597c03ee36 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -109,7 +109,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { } SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = 0; // TODO: use macro + pHdr->type = SNAP_DATA_META; pHdr->size = nData; memcpy(pHdr->data, pData, nData); diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 21dfd8a32d617a2674940c483287506bdf7a8852..c5cb816887180a44050770adfafd84fe5cf43c65 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -49,7 +49,8 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]); + code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2, + &pReader->pDataReader[i]); if (code < 0) { goto _err; } @@ -221,10 +222,9 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { } } + smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); taosMemoryFree(pWriter); *ppWriter = NULL; - - smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); return code; _err: @@ -245,15 +245,17 @@ int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); } else if (pHdr->type == SNAP_DATA_QTASK) { code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); + } else { + ASSERT(0); } if (code < 0) goto _err; _exit: - smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); + smaInfo("vgId:%d rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); return code; _err: - smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, + smaError("vgId:%d rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 51d7edcf71597e0829cb62660e5caf02e8c8008d..6bb2b8c253ff6a8a153f0194a9319f94513c3480 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -21,6 +21,7 @@ struct STsdbSnapReader { int64_t sver; int64_t ever; STsdbFS fs; + int8_t type; // for data file int8_t dataDone; int32_t fid; @@ -62,7 +63,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->iBlockIdx = 0; pReader->pBlockIdx = NULL; - tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid); + tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path, + pReader->fid); } while (true) { @@ -130,7 +132,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { } SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = 1; + pHdr->type = pReader->type; pHdr->size = size; TABLEID* pId = (TABLEID*)(&pHdr[1]); @@ -139,9 +141,9 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData); - tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64 + tsdbInfo("vgId:%d vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d", - TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, + TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow, size); @@ -154,7 +156,8 @@ _exit: return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); return code; } @@ -212,7 +215,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { } SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = 2; + pHdr->type = SNAP_DATA_DEL; pHdr->size = size; TABLEID* pId = (TABLEID*)(&pHdr[1]); @@ -228,8 +231,8 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { n += tPutDelData((*ppData) + n, pDelData); } - tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d", - TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size); + tsdbInfo("vgId:%d vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d", + TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size); break; } @@ -238,11 +241,12 @@ _exit: return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode, + tstrerror(code)); return code; } -int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) { +int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) { int32_t code = 0; STsdbSnapReader* pReader = NULL; @@ -255,6 +259,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe pReader->pTsdb = pTsdb; pReader->sver = sver; pReader->ever = ever; + pReader->type = type; code = taosThreadRwlockRdlock(&pTsdb->rwLock); if (code) { @@ -297,12 +302,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe goto _err; } - tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path); *ppReader = pReader; return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); *ppReader = NULL; return code; } @@ -327,7 +333,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { tsdbFSUnref(pReader->pTsdb, &pReader->fs); - tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); taosMemoryFree(pReader); *ppReader = NULL; @@ -368,10 +374,12 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { } _exit: + tsdbDebug("vgId:%d vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode), + pReader->pTsdb->path, tstrerror(code)); return code; } @@ -436,7 +444,8 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, return code; _err: - tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); return code; } @@ -522,9 +531,12 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { } _exit: + tsdbInfo("vgId:%d tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); return code; _err: + tsdbError("vgId:%d tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); return code; } @@ -570,6 +582,8 @@ _exit: return code; _err: + tsdbError("vgId:%d tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); return code; } @@ -708,8 +722,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), - tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); return code; } @@ -794,11 +808,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { if (code) goto _err; _exit: + tsdbDebug("vgId:%d vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), - tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); return code; } @@ -833,11 +848,12 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { } _exit: - tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path); return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); return code; } @@ -920,12 +936,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbSnapWriteTableData(pWriter, id); if (code) goto _err; - tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", - TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); + tsdbInfo("vgId:%d vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", + TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); return code; } @@ -1015,7 +1032,8 @@ _exit: return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); return code; } @@ -1056,11 +1074,12 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { } _exit: - tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); return code; } @@ -1127,10 +1146,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr } *ppWriter = pWriter; - return code; + tsdbInfo("vgId:%d tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); + return code; _err: - tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, + tstrerror(code)); *ppWriter = NULL; return code; } @@ -1157,14 +1178,16 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { if (code) goto _err; } + tsdbInfo("vgId:%d vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); taosMemoryFree(pWriter); *ppWriter = NULL; - return code; _err: - tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), - tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path, tstrerror(code)); + taosMemoryFree(pWriter); + *ppWriter = NULL; return code; } @@ -1173,7 +1196,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; // ts data - if (pHdr->type == 1) { + if (pHdr->type == SNAP_DATA_TSDB) { code = tsdbSnapWriteData(pWriter, pData, nData); if (code) goto _err; @@ -1186,15 +1209,17 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) } // del data - if (pHdr->type == 2) { + if (pHdr->type == SNAP_DATA_DEL) { code = tsdbSnapWriteDel(pWriter, pData, nData); if (code) goto _err; } _exit: + tsdbDebug("vgId:%d tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); return code; _err: - tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path, + tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 3f8f81cb09a3ce270b9a1063dcb2bc0bd98e94e5..15cc6a7197cb88c95ace5db61e0a59c2a6221561 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -28,7 +28,8 @@ struct SVSnapReader { int8_t tsdbDone; STsdbSnapReader *pTsdbReader; // rsma - int8_t rsmaDone[TSDB_RETENTION_L2]; + int8_t rsmaDone; + SRsmaSnapReader *pRsmaReader; }; int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { @@ -57,6 +58,10 @@ _err: int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t code = 0; + if (pReader->pRsmaReader) { + rsmaSnapReaderClose(&pReader->pRsmaReader); + } + if (pReader->pTsdbReader) { tsdbSnapReaderClose(&pReader->pTsdbReader); } @@ -99,7 +104,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->tsdbDone) { // open if not if (pReader->pTsdbReader == NULL) { - code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader); + code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader); if (code) goto _err; } @@ -118,40 +123,26 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // RSMA ============== -#if 0 - if (VND_IS_RSMA(pReader->pVnode)) { - // RSMA1/RSMA2 - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (!pReader->rsmaDone[i]) { - if (!pReader->pVnode->pSma->pRSmaTsdb[i]) { - // no valid tsdb - pReader->rsmaDone[i] = 1; - continue; - } - if (pReader->pTsdbReader == NULL) { - code = tsdbSnapReaderOpen(pReader->pVnode->pSma->pRSmaTsdb[i], pReader->sver, pReader->ever, - &pReader->pTsdbReader); - if (code) goto _err; - } - - code = tsdbSnapRead(pReader->pTsdbReader, ppData); - if (code) { - goto _err; - } else { - if (*ppData) { - goto _exit; - } else { - pReader->tsdbDone = 1; - code = tsdbSnapReaderClose(&pReader->pTsdbReader); - if (code) goto _err; - } - } + if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { + // open if not + if (pReader->pRsmaReader == NULL) { + code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader); + if (code) goto _err; + } + + code = rsmaSnapRead(pReader->pRsmaReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->tsdbDone = 1; + code = rsmaSnapReaderClose(&pReader->pRsmaReader); + if (code) goto _err; } } - // QTaskInfoFile - // TODO ... } -#endif *ppData = NULL; *nData = 0; @@ -186,6 +177,8 @@ struct SVSnapWriter { SMetaSnapWriter *pMetaSnapWriter; // tsdb STsdbSnapWriter *pTsdbSnapWriter; + // rsma + SRsmaSnapWriter *pRsmaSnapWriter; }; int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { @@ -235,6 +228,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _err; } + if (pWriter->pRsmaSnapWriter) { + code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); + if (code) goto _err; + } + if (!rollback) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN]; @@ -282,28 +280,51 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, pHdr->type, nData); - if (pHdr->type == 0) { - // meta + switch (pHdr->type) { + case SNAP_DATA_META: { + // meta + if (pWriter->pMetaSnapWriter == NULL) { + code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); + if (code) goto _err; + } - if (pWriter->pMetaSnapWriter == NULL) { - code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); + code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); if (code) goto _err; - } + } break; + case SNAP_DATA_TSDB: { + // tsdb + if (pWriter->pTsdbSnapWriter == NULL) { + code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); + if (code) goto _err; + } - code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); - if (code) goto _err; - } else { - // tsdb + code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); + if (code) goto _err; + } break; + case SNAP_DATA_RSMA1: + case SNAP_DATA_RSMA2: { + // rsma1/rsma2 + if (pWriter->pRsmaSnapWriter == NULL) { + code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter); + if (code) goto _err; + } - if (pWriter->pTsdbSnapWriter == NULL) { - code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); + code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData); if (code) goto _err; - } + } break; + case SNAP_DATA_QTASK: { + // qtask for rsma + if (pWriter->pRsmaSnapWriter == NULL) { + code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter); + if (code) goto _err; + } - code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); - if (code) goto _err; + code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData); + if (code) goto _err; + } break; + default: + break; } - _exit: return code; diff --git a/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim index ec03aaf9db076b45538b8dc7d2071b4d6b9d0922..241781eed103e6c4cbf9971b18513ca79f213b09 100644 --- a/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim @@ -47,7 +47,7 @@ endi $replica = 3 $vgroups = 1 -$retentions = 5s:7d,15s:21d +$retentions = 5s:7d,15s:21d,1m:365d print ============= create database sql create database db replica $replica vgroups $vgroups retentions $retentions @@ -114,7 +114,7 @@ endi vg_ready: print ====> create stable/child table -sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s sql show stables if $rows != 1 then @@ -129,20 +129,28 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT sleep 3000 -print ===> write 100 records -$N = 100 -$count = 0 -while $count < $N - $ms = 1659000000000 + $count - sql insert into ct1 values( $ms , $count , 2.1, 3.1) - $count = $count + 1 -endw +print ===> write 0-50 records +$ms = 0 +$cnt = 0 +while $cnt < 50 + $ms = $cnt . m + sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1) + $cnt = $cnt + 1 + endw +print ===> flush database db +sql flush database db; +sleep 5000 +print ===> write 51-100 records +while $cnt < 100 + $ms = $cnt . m + sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1) + $cnt = $cnt + 1 + endw -#sql flush database db; - - -sleep 3000 +print ===> flush database db +sql flush database db; +sleep 5000 print ===> stop dnode1 dnode2 dnode3 @@ -150,8 +158,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT -sleep 10000 - ######################################################## print ===> start dnode1 dnode2 dnode3 dnode4 system sh/exec.sh -n dnode1 -s start @@ -164,7 +170,7 @@ sleep 3000 print =============== query data sql connect sql use db -sql select * from ct1 +sql select * from ct1 where ts > now - 1d print rows: $rows print $data00 $data01 $data02 if $rows != 100 then