From 39626c1bc6fe3a1f2247c0156f487b8c4f95f592 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 10 Jun 2022 11:50:06 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 23 +++- source/dnode/vnode/src/tsdb/tsdbCommit.c | 108 +++++++++++------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 22 ++++ 5 files changed, 110 insertions(+), 49 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6acae4a4d5..8172ed4075 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -78,6 +78,12 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); typedef struct SDelData SDelData; typedef struct SDelIdx SDelIdx; +// SDataFWriter +typedef struct SDataFWriter SDataFWriter; + +// SDataFReader +typedef struct SDataFReader SDataFReader; + // SDelFWriter typedef struct SDelFWriter SDelFWriter; @@ -89,11 +95,17 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf) // SDelFReader typedef struct SDelFReader SDelFReader; -int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile); +int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile); int32_t tsdbDelFReaderClose(SDelFReader *pReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf); int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf); +// SCacheFWriter +typedef struct SCacheFWriter SCacheFWriter; + +// SCacheFReader +typedef struct SCacheFReader SCacheFReader; + // tsdbCommit.c ============================================================================================== // old @@ -314,7 +326,7 @@ struct SMemTable { TSDBKEY minKey; TSDBKEY maxKey; int64_t nRow; - int64_t nDelOp; + int64_t nDel; SArray *aTbData; // SArray }; @@ -782,10 +794,11 @@ typedef struct { struct SDelIdx { uint32_t delimiter; - int8_t flags; - int64_t nItem; + uint8_t flags; + uint32_t nOffset; uint8_t *pOffset; - uint8_t *pDelIdxItem; + uint32_t nData; + uint8_t *pData; }; #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8fe7b252fa..c664a53d70 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -21,6 +21,7 @@ struct SCommitter { STsdb *pTsdb; uint8_t *pBuf1; uint8_t *pBuf2; + uint8_t *pBuf3; /* commit data */ int32_t minutes; int8_t precision; @@ -37,12 +38,10 @@ struct SCommitter { STbData *pTbData; SBlockIdx *pBlockIdx; /* commit del */ - SDelFReader *pTombstoneReader; - SDelFWriter *pTombstoneWritter; - SDelIdx delIdxO; - SDelIdx delIdxN; - SDelData delDataO; - SDelData delDataN; + SDelFReader *pDelFReader; + SDelFWriter *pDelFWriter; + SDelIdx oDelIdx; + SDelIdx nDelIdx; /* commit cache */ }; @@ -164,8 +163,35 @@ _err: } static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { - int32_t code = 0; - // TODO + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + SDelFile *pDelFileR = NULL; // TODO + SDelFile *pDelFileW = NULL; // TODO + + if (pDelFileR) { + code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR); + if (code) { + goto _err; + } + + code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdx, &pCommitter->pBuf1); + if (code) { + goto _err; + } + } + + code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW); + if (code) { + goto _err; + } + +_exit: + tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode)); + return code; + +_err: + tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -182,36 +208,36 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { SDelIdx *pDelIdx = NULL; while (iTbData < nTbData || iDelIdx < nDelIdx) { - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } - if (iDelIdx < nDelIdx) { - // pDelIdx = ; // TODO - } else { - pDelIdx = NULL; - } - - if (pTbData && pDelIdx) { - c = tTABLEIDCmprFn(pTbData, pDelIdx); - if (c == 0) { - iTbData++; - iDelIdx++; - } else if (c < 0) { - iTbData++; - pDelIdx = NULL; - } else { - iDelIdx++; - pTbData = NULL; - } - } else { - if (pTbData) { - iTbData++; - } else { - iDelIdx++; - } - } + // if (iTbData < nTbData) { + // pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + // } else { + // pTbData = NULL; + // } + // if (iDelIdx < nDelIdx) { + // // pDelIdx = ; // TODO + // } else { + // pDelIdx = NULL; + // } + + // if (pTbData && pDelIdx) { + // c = tTABLEIDCmprFn(pTbData, pDelIdx); + // if (c == 0) { + // iTbData++; + // iDelIdx++; + // } else if (c < 0) { + // iTbData++; + // pDelIdx = NULL; + // } else { + // iDelIdx++; + // pTbData = NULL; + // } + // } else { + // if (pTbData) { + // iTbData++; + // } else { + // iDelIdx++; + // } + // } // TODO: commit with the pTbData and pDelIdx } @@ -222,8 +248,6 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { int32_t code = 0; - ASSERT(pCommitter->delIdxN.nItem > 0); - return code; _err: @@ -235,7 +259,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - if (pMemTable->nDelOp == 0) { + if (pMemTable->nDel == 0) { goto _exit; } @@ -258,9 +282,11 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { } _exit: + tsdbDebug("vgId:%d commit del data, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); return code; _err: + tsdbError("vgId:%d failed to commit del data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index ffbef4e765..7ffff7a5b7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -47,7 +47,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX}; pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1}; pMemTable->nRow = 0; - pMemTable->nDelOp = 0; + pMemTable->nDel = 0; pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *)); if (pMemTable->aTbData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -174,7 +174,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid // update the state of pMemTable and other (todo) - pMemTable->nDelOp++; + pMemTable->nDel++; tsdbError("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 " since %s", diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 6d010b1614..d4adb08b4c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -109,7 +109,7 @@ struct SDelFReader { TdFilePtr pReadH; }; -int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile) { +int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile) { int32_t code = 0; // TODO return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 238cf431c3..5d9924bfea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -51,4 +51,26 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { } return 0; +} + +int32_t tPutSDelIdx(uint8_t *p, SDelIdx *pDelIdx) { + int32_t n = 0; + + n += tPutU32(p ? p + n : p, pDelIdx->delimiter); + n += tPutU8(p ? p + n : p, pDelIdx->flags); + n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset); + n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData); + + return n; +} + +int32_t tGetSDelIdx(uint8_t *p, SDelIdx *pDelIdx) { + int32_t n = 0; + + n += tGetU32(p + n, &pDelIdx->delimiter); + n += tGetU8(p + n, &pDelIdx->flags); + n += tGetBinary(p + n, &pDelIdx->pOffset, &pDelIdx->nOffset); + n += tGetBinary(p + n, &pDelIdx->pData, &pDelIdx->nData); + + return n; } \ No newline at end of file -- GitLab