提交 466e7965 编写于 作者: C Cary Xu

feat: rsma1/rsma2 support replication

上级 1e391ad2
...@@ -1713,8 +1713,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { ...@@ -1713,8 +1713,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag,
pDataBlock->info.childId, pDataBlock->info.groupId); pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag); printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) { for (int32_t k = 0; k < numOfCols; k++) {
......
...@@ -31,6 +31,7 @@ target_sources( ...@@ -31,6 +31,7 @@ target_sources(
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaCommit.c" "src/sma/smaCommit.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"
"src/sma/smaSnapshot.c"
"src/sma/smaTimeRange.c" "src/sma/smaTimeRange.c"
# tsdb # tsdb
......
...@@ -209,6 +209,9 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, ...@@ -209,6 +209,9 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen,
// smaFileUtil ================ // smaFileUtil ================
typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter;
#define TD_FILE_HEAD_SIZE 512 #define TD_FILE_HEAD_SIZE 512
typedef struct STFInfo STFInfo; typedef struct STFInfo STFInfo;
......
...@@ -62,6 +62,8 @@ typedef struct SMetaSnapReader SMetaSnapReader; ...@@ -62,6 +62,8 @@ typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct SMetaSnapWriter SMetaSnapWriter;
typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapReader STsdbSnapReader;
typedef struct STsdbSnapWriter STsdbSnapWriter; typedef struct STsdbSnapWriter STsdbSnapWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
...@@ -196,13 +198,21 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr ...@@ -196,13 +198,21 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback); int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader); int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// SRsmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData);
// SRsmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
...@@ -314,6 +324,15 @@ struct SSma { ...@@ -314,6 +324,15 @@ struct SSma {
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
enum {
SNAP_DATA_META = 0,
SNAP_DATA_TSDB = 1,
SNAP_DATA_DEL = 2,
SNAP_DATA_RSMA1 = 3,
SNAP_DATA_RSMA2 = 4,
SNAP_DATA_QTASK = 5,
};
struct SSnapDataHdr { struct SSnapDataHdr {
int8_t type; int8_t type;
int64_t index; int64_t index;
......
...@@ -109,7 +109,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { ...@@ -109,7 +109,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 0; // TODO: use macro pHdr->type = SNAP_DATA_META;
pHdr->size = nData; pHdr->size = nData;
memcpy(pHdr->data, pData, nData); memcpy(pHdr->data, pData, nData);
......
...@@ -49,7 +49,8 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead ...@@ -49,7 +49,8 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) { if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]); code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
&pReader->pDataReader[i]);
if (code < 0) { if (code < 0) {
goto _err; goto _err;
} }
...@@ -221,10 +222,9 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -221,10 +222,9 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
} }
} }
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
return code; return code;
_err: _err:
...@@ -245,15 +245,17 @@ int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -245,15 +245,17 @@ int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
} else if (pHdr->type == SNAP_DATA_QTASK) { } else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
ASSERT(0);
} }
if (code < 0) goto _err; if (code < 0) goto _err;
_exit: _exit:
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); smaInfo("vgId:%d rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
return code; return code;
_err: _err:
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, smaError("vgId:%d rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
tstrerror(code)); tstrerror(code));
return code; return code;
} }
......
...@@ -21,6 +21,7 @@ struct STsdbSnapReader { ...@@ -21,6 +21,7 @@ struct STsdbSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
STsdbFS fs; STsdbFS fs;
int8_t type;
// for data file // for data file
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
...@@ -62,7 +63,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -62,7 +63,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iBlockIdx = 0; pReader->iBlockIdx = 0;
pReader->pBlockIdx = NULL; pReader->pBlockIdx = NULL;
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid); tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
pReader->fid);
} }
while (true) { while (true) {
...@@ -130,7 +132,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -130,7 +132,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 1; pHdr->type = pReader->type;
pHdr->size = size; pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]); TABLEID* pId = (TABLEID*)(&pHdr[1]);
...@@ -139,9 +141,9 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -139,9 +141,9 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData); tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64 tsdbInfo("vgId:%d vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d", " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow, pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
size); size);
...@@ -154,7 +156,8 @@ _exit: ...@@ -154,7 +156,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -212,7 +215,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -212,7 +215,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 2; pHdr->type = SNAP_DATA_DEL;
pHdr->size = size; pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]); TABLEID* pId = (TABLEID*)(&pHdr[1]);
...@@ -228,8 +231,8 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -228,8 +231,8 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
n += tPutDelData((*ppData) + n, pDelData); n += tPutDelData((*ppData) + n, pDelData);
} }
tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d", tsdbInfo("vgId:%d vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size); TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
break; break;
} }
...@@ -238,11 +241,12 @@ _exit: ...@@ -238,11 +241,12 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode,
tstrerror(code));
return code; return code;
} }
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) { int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
STsdbSnapReader* pReader = NULL; STsdbSnapReader* pReader = NULL;
...@@ -255,6 +259,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe ...@@ -255,6 +259,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
pReader->pTsdb = pTsdb; pReader->pTsdb = pTsdb;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->type = type;
code = taosThreadRwlockRdlock(&pTsdb->rwLock); code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) { if (code) {
...@@ -297,12 +302,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe ...@@ -297,12 +302,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
goto _err; goto _err;
} }
tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
*ppReader = pReader; *ppReader = pReader;
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
...@@ -327,7 +333,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -327,7 +333,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
tsdbFSUnref(pReader->pTsdb, &pReader->fs); tsdbFSUnref(pReader->pTsdb, &pReader->fs);
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
taosMemoryFree(pReader); taosMemoryFree(pReader);
*ppReader = NULL; *ppReader = NULL;
...@@ -368,10 +374,12 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -368,10 +374,12 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
_exit: _exit:
tsdbDebug("vgId:%d vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode),
pReader->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -436,7 +444,8 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, ...@@ -436,7 +444,8 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData,
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -522,9 +531,12 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { ...@@ -522,9 +531,12 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -570,6 +582,8 @@ _exit: ...@@ -570,6 +582,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -708,8 +722,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { ...@@ -708,8 +722,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -794,11 +808,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { ...@@ -794,11 +808,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if (code) goto _err; if (code) goto _err;
_exit: _exit:
tsdbDebug("vgId:%d vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
...@@ -833,11 +848,12 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { ...@@ -833,11 +848,12 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -920,12 +936,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ...@@ -920,12 +936,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbSnapWriteTableData(pWriter, id); code = tsdbSnapWriteTableData(pWriter, id);
if (code) goto _err; if (code) goto _err;
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", tsdbInfo("vgId:%d vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -1015,7 +1032,8 @@ _exit: ...@@ -1015,7 +1032,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -1056,11 +1074,12 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1056,11 +1074,12 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -1127,10 +1146,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1127,10 +1146,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
} }
*ppWriter = pWriter; *ppWriter = pWriter;
return code;
tsdbInfo("vgId:%d tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
*ppWriter = NULL; *ppWriter = NULL;
return code; return code;
} }
...@@ -1157,14 +1178,16 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1157,14 +1178,16 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
tsdbInfo("vgId:%d vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code; return code;
} }
...@@ -1173,7 +1196,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1173,7 +1196,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// ts data // ts data
if (pHdr->type == 1) { if (pHdr->type == SNAP_DATA_TSDB) {
code = tsdbSnapWriteData(pWriter, pData, nData); code = tsdbSnapWriteData(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
...@@ -1186,15 +1209,17 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1186,15 +1209,17 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
} }
// del data // del data
if (pHdr->type == 2) { if (pHdr->type == SNAP_DATA_DEL) {
code = tsdbSnapWriteDel(pWriter, pData, nData); code = tsdbSnapWriteDel(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} }
_exit: _exit:
tsdbDebug("vgId:%d tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
tstrerror(code));
return code; return code;
} }
...@@ -28,7 +28,8 @@ struct SVSnapReader { ...@@ -28,7 +28,8 @@ struct SVSnapReader {
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
// rsma // rsma
int8_t rsmaDone[TSDB_RETENTION_L2]; int8_t rsmaDone;
SRsmaSnapReader *pRsmaReader;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
...@@ -57,6 +58,10 @@ _err: ...@@ -57,6 +58,10 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
if (pReader->pRsmaReader) {
rsmaSnapReaderClose(&pReader->pRsmaReader);
}
if (pReader->pTsdbReader) { if (pReader->pTsdbReader) {
tsdbSnapReaderClose(&pReader->pTsdbReader); tsdbSnapReaderClose(&pReader->pTsdbReader);
} }
...@@ -99,7 +104,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -99,7 +104,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
// open if not // open if not
if (pReader->pTsdbReader == NULL) { if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader); code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader);
if (code) goto _err; if (code) goto _err;
} }
...@@ -118,40 +123,26 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -118,40 +123,26 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
// RSMA ============== // RSMA ==============
#if 0 if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
if (VND_IS_RSMA(pReader->pVnode)) { // open if not
// RSMA1/RSMA2 if (pReader->pRsmaReader == NULL) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
if (!pReader->rsmaDone[i]) { if (code) goto _err;
if (!pReader->pVnode->pSma->pRSmaTsdb[i]) { }
// no valid tsdb
pReader->rsmaDone[i] = 1; code = rsmaSnapRead(pReader->pRsmaReader, ppData);
continue; if (code) {
} goto _err;
if (pReader->pTsdbReader == NULL) { } else {
code = tsdbSnapReaderOpen(pReader->pVnode->pSma->pRSmaTsdb[i], pReader->sver, pReader->ever, if (*ppData) {
&pReader->pTsdbReader); goto _exit;
if (code) goto _err; } else {
} pReader->tsdbDone = 1;
code = rsmaSnapReaderClose(&pReader->pRsmaReader);
code = tsdbSnapRead(pReader->pTsdbReader, ppData); if (code) goto _err;
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader);
if (code) goto _err;
}
}
} }
} }
// QTaskInfoFile
// TODO ...
} }
#endif
*ppData = NULL; *ppData = NULL;
*nData = 0; *nData = 0;
...@@ -186,6 +177,8 @@ struct SVSnapWriter { ...@@ -186,6 +177,8 @@ struct SVSnapWriter {
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
// rsma
SRsmaSnapWriter *pRsmaSnapWriter;
}; };
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
...@@ -235,6 +228,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * ...@@ -235,6 +228,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _err; if (code) goto _err;
} }
if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _err;
}
if (!rollback) { if (!rollback) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
...@@ -282,28 +280,51 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -282,28 +280,51 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData); pHdr->type, nData);
if (pHdr->type == 0) { switch (pHdr->type) {
// meta case SNAP_DATA_META: {
// meta
if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err;
}
if (pWriter->pMetaSnapWriter == NULL) { code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err; if (code) goto _err;
} } break;
case SNAP_DATA_TSDB: {
// tsdb
if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err;
}
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} else { } break;
// tsdb case SNAP_DATA_RSMA1:
case SNAP_DATA_RSMA2: {
// rsma1/rsma2
if (pWriter->pRsmaSnapWriter == NULL) {
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
if (code) goto _err;
}
if (pWriter->pTsdbSnapWriter == NULL) { code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err; if (code) goto _err;
} } break;
case SNAP_DATA_QTASK: {
// qtask for rsma
if (pWriter->pRsmaSnapWriter == NULL) {
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
if (code) goto _err;
}
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break;
default:
break;
} }
_exit: _exit:
return code; return code;
......
...@@ -47,7 +47,7 @@ endi ...@@ -47,7 +47,7 @@ endi
$replica = 3 $replica = 3
$vgroups = 1 $vgroups = 1
$retentions = 5s:7d,15s:21d $retentions = 5s:7d,15s:21d,1m:365d
print ============= create database print ============= create database
sql create database db replica $replica vgroups $vgroups retentions $retentions sql create database db replica $replica vgroups $vgroups retentions $retentions
...@@ -114,7 +114,7 @@ endi ...@@ -114,7 +114,7 @@ endi
vg_ready: vg_ready:
print ====> create stable/child table print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
...@@ -129,20 +129,28 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT ...@@ -129,20 +129,28 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
sleep 3000 sleep 3000
print ===> write 100 records print ===> write 0-50 records
$N = 100 $ms = 0
$count = 0 $cnt = 0
while $count < $N while $cnt < 50
$ms = 1659000000000 + $count $ms = $cnt . m
sql insert into ct1 values( $ms , $count , 2.1, 3.1) sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
$count = $count + 1 $cnt = $cnt + 1
endw endw
print ===> flush database db
sql flush database db;
sleep 5000
print ===> write 51-100 records
while $cnt < 100
$ms = $cnt . m
sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
$cnt = $cnt + 1
endw
#sql flush database db; print ===> flush database db
sql flush database db;
sleep 5000
sleep 3000
print ===> stop dnode1 dnode2 dnode3 print ===> stop dnode1 dnode2 dnode3
...@@ -150,8 +158,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT ...@@ -150,8 +158,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 10000
######################################################## ########################################################
print ===> start dnode1 dnode2 dnode3 dnode4 print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
...@@ -164,7 +170,7 @@ sleep 3000 ...@@ -164,7 +170,7 @@ sleep 3000
print =============== query data print =============== query data
sql connect sql connect
sql use db sql use db
sql select * from ct1 sql select * from ct1 where ts > now - 1d
print rows: $rows print rows: $rows
print $data00 $data01 $data02 print $data00 $data01 $data02
if $rows != 100 then if $rows != 100 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册