提交 58374208 编写于 作者: H Hongze Cheng

more vnode snapshot writer

上级 eaa0a428
...@@ -24,12 +24,12 @@ extern "C" { ...@@ -24,12 +24,12 @@ extern "C" {
// tsdbDebug ================ // tsdbDebug ================
// clang-format off // clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSD FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSD ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSD WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct TSDBROW TSDBROW; typedef struct TSDBROW TSDBROW;
...@@ -137,7 +137,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData); ...@@ -137,7 +137,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema);
void tBlockDataClearData(SBlockData *pBlockData); void tBlockDataClearData(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
......
...@@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -476,9 +476,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iFileSet >= 0) { if (--state->iFileSet >= 0) {
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
} else { } else {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
if (state->pBlockData) { if (state->pBlockData) {
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
...@@ -582,8 +582,8 @@ _err: ...@@ -582,8 +582,8 @@ _err:
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
...@@ -609,8 +609,8 @@ int32_t clearNextRowFromFS(void *iter) { ...@@ -609,8 +609,8 @@ int32_t clearNextRowFromFS(void *iter) {
state->aBlockIdx = NULL; state->aBlockIdx = NULL;
} }
if (state->pBlockData) { if (state->pBlockData) {
// tBlockDataClear(&state->blockData); // tBlockDataClear(&state->blockData, 1);
tBlockDataClear(state->pBlockData); tBlockDataClear(state->pBlockData, 1);
state->pBlockData = NULL; state->pBlockData = NULL;
} }
......
...@@ -1001,10 +1001,10 @@ _exit: ...@@ -1001,10 +1001,10 @@ _exit:
static void tsdbCommitDataEnd(SCommitter *pCommitter) { static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx); taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap); tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData); tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN); taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap); tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData); tBlockDataClear(&pCommitter->nBlockData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }
......
...@@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl ...@@ -979,21 +979,21 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
tFree(pBuf1); tFree(pBuf1);
...@@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p ...@@ -1115,29 +1115,29 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
code = tBlockDataCopy(pBlockData, pBlockData2); code = tBlockDataCopy(pBlockData, pBlockData2);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
// merge two block data // merge two block data
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
if (code) { if (code) {
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
goto _err; goto _err;
} }
} }
tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2); tBlockDataClear(pBlockData2, 1);
} }
ASSERT(pBlock->nRow == pBlockData->nRow); ASSERT(pBlock->nRow == pBlockData->nRow);
......
...@@ -296,8 +296,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -296,8 +296,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
} }
taosArrayDestroy(pReader->aBlockIdx); taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock); tMapDataClear(&pReader->mBlock);
tBlockDataClear(&pReader->oBlockData); tBlockDataClear(&pReader->oBlockData, 1);
tBlockDataClear(&pReader->nBlockData); tBlockDataClear(&pReader->nBlockData, 1);
if (pReader->pDelFReader) { if (pReader->pDelFReader) {
tsdbDelFReaderClose(&pReader->pDelFReader); tsdbDelFReaderClose(&pReader->pDelFReader);
...@@ -555,11 +555,21 @@ _err: ...@@ -555,11 +555,21 @@ _err:
} }
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
int64_t skey; // todo SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
int64_t ekey; // todo TABLEID id = *(TABLEID*)(&pHdr[1]);
int64_t n;
SBlockData bData = {0};
SBlockData* pBlockData = &bData;
// decode
code = tBlockDataInit(pBlockData);
if (code) goto _err;
n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
#if 0
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision); int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision)); ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
...@@ -600,7 +610,10 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ...@@ -600,7 +610,10 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbSnapWriteTableData(pWriter, pData, nData); code = tsdbSnapWriteTableData(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
#endif
tsdbInfo("vgId:%d vnode snapshot tsdb write data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pTsdb->pVnode),
id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
...@@ -755,6 +768,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -755,6 +768,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
pWriter->minutes = pTsdb->keepCfg.days;
pWriter->precision = pTsdb->keepCfg.precision;
pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
...@@ -793,24 +812,29 @@ _err: ...@@ -793,24 +812,29 @@ _err:
} }
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
int8_t type = pData[0]; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// ts data // ts data
if (type == 0) { if (pHdr->type == 1) {
code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1); code = tsdbSnapWriteData(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
goto _exit;
} else { } else {
code = tsdbSnapWriteDataEnd(pWriter); if (pWriter->pDataFWriter) {
if (code) goto _err; code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err;
}
} }
// del data // del data
if (type == 1) { if (pHdr->type == 2) {
code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1);
if (code) goto _err; if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err:
......
...@@ -1032,11 +1032,11 @@ void tBlockDataReset(SBlockData *pBlockData) { ...@@ -1032,11 +1032,11 @@ void tBlockDataReset(SBlockData *pBlockData) {
taosArrayClear(pBlockData->aIdx); taosArrayClear(pBlockData->aIdx);
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
} }
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) {
......
...@@ -176,7 +176,6 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -176,7 +176,6 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->ever = ever; pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode)); vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
...@@ -189,15 +188,15 @@ _err: ...@@ -189,15 +188,15 @@ _err:
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
goto _exit; // TODO
if (rollback) { // if (rollback) {
code = vnodeSnapRollback(pWriter); // code = vnodeSnapRollback(pWriter);
if (code) goto _err; // if (code) goto _err;
} else { // } else {
code = vnodeSnapCommit(pWriter); // code = vnodeSnapCommit(pWriter);
if (code) goto _err; // if (code) goto _err;
} // }
_exit: _exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback); vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
...@@ -214,34 +213,40 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -214,34 +213,40 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
// ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
ASSERT(pHdr->index == pWriter->index + 1);
pWriter->index = pHdr->index;
// if (pHdr->type == 0) { vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
// // meta pHdr->type, nData);
// if (pWriter->pMetaSnapWriter == NULL) {
// code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
// if (code) goto _err;
// }
// code = metaSnapWrite(pWriter->pMetaSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); if (pHdr->type == 0) {
// if (code) goto _err; // meta
// } else {
// // tsdb
// if (pWriter->pTsdbSnapWriter == NULL) {
// code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
// if (code) goto _err;
// }
// code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); // if (pWriter->pMetaSnapWriter == NULL) {
// if (code) goto _err; // code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
// } // if (code) goto _err;
// }
// code = metaSnapWrite(pWriter->pMetaSnapWriter, pData , nData);
// if (code) goto _err;
} else {
// tsdb
if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err;
}
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err;
}
_exit: _exit:
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData);
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot write failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot write failed since %s, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode),
tstrerror(code), pHdr->index, pHdr->type, nData);
return code; return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册