提交 39626c1b 编写于 作者: H Hongze Cheng

more work

上级 2f87d80d
......@@ -78,6 +78,12 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
// SDataFWriter
typedef struct SDataFWriter SDataFWriter;
// SDataFReader
typedef struct SDataFReader SDataFReader;
// SDelFWriter
typedef struct SDelFWriter SDelFWriter;
......@@ -89,11 +95,17 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SDelIdx *pDelIdx, uint8_t **ppBuf)
// SDelFReader
typedef struct SDelFReader SDelFReader;
int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile);
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile);
int32_t tsdbDelFReaderClose(SDelFReader *pReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelData *pDelData, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf);
// SCacheFWriter
typedef struct SCacheFWriter SCacheFWriter;
// SCacheFReader
typedef struct SCacheFReader SCacheFReader;
// tsdbCommit.c ==============================================================================================
// old
......@@ -314,7 +326,7 @@ struct SMemTable {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRow;
int64_t nDelOp;
int64_t nDel;
SArray *aTbData; // SArray<STbData*>
};
......@@ -782,10 +794,11 @@ typedef struct {
struct SDelIdx {
uint32_t delimiter;
int8_t flags;
int64_t nItem;
uint8_t flags;
uint32_t nOffset;
uint8_t *pOffset;
uint8_t *pDelIdxItem;
uint32_t nData;
uint8_t *pData;
};
#endif
......
......@@ -21,6 +21,7 @@ struct SCommitter {
STsdb *pTsdb;
uint8_t *pBuf1;
uint8_t *pBuf2;
uint8_t *pBuf3;
/* commit data */
int32_t minutes;
int8_t precision;
......@@ -37,12 +38,10 @@ struct SCommitter {
STbData *pTbData;
SBlockIdx *pBlockIdx;
/* commit del */
SDelFReader *pTombstoneReader;
SDelFWriter *pTombstoneWritter;
SDelIdx delIdxO;
SDelIdx delIdxN;
SDelData delDataO;
SDelData delDataN;
SDelFReader *pDelFReader;
SDelFWriter *pDelFWriter;
SDelIdx oDelIdx;
SDelIdx nDelIdx;
/* commit cache */
};
......@@ -164,8 +163,35 @@ _err:
}
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
SDelFile *pDelFileR = NULL; // TODO
SDelFile *pDelFileW = NULL; // TODO
if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR);
if (code) {
goto _err;
}
code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdx, &pCommitter->pBuf1);
if (code) {
goto _err;
}
}
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW);
if (code) {
goto _err;
}
_exit:
tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
return code;
_err:
tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -182,36 +208,36 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
SDelIdx *pDelIdx = NULL;
while (iTbData < nTbData || iDelIdx < nDelIdx) {
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
if (iDelIdx < nDelIdx) {
// pDelIdx = ; // TODO
} else {
pDelIdx = NULL;
}
if (pTbData && pDelIdx) {
c = tTABLEIDCmprFn(pTbData, pDelIdx);
if (c == 0) {
iTbData++;
iDelIdx++;
} else if (c < 0) {
iTbData++;
pDelIdx = NULL;
} else {
iDelIdx++;
pTbData = NULL;
}
} else {
if (pTbData) {
iTbData++;
} else {
iDelIdx++;
}
}
// if (iTbData < nTbData) {
// pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
// } else {
// pTbData = NULL;
// }
// if (iDelIdx < nDelIdx) {
// // pDelIdx = ; // TODO
// } else {
// pDelIdx = NULL;
// }
// if (pTbData && pDelIdx) {
// c = tTABLEIDCmprFn(pTbData, pDelIdx);
// if (c == 0) {
// iTbData++;
// iDelIdx++;
// } else if (c < 0) {
// iTbData++;
// pDelIdx = NULL;
// } else {
// iDelIdx++;
// pTbData = NULL;
// }
// } else {
// if (pTbData) {
// iTbData++;
// } else {
// iDelIdx++;
// }
// }
// TODO: commit with the pTbData and pDelIdx
}
......@@ -222,8 +248,6 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0;
ASSERT(pCommitter->delIdxN.nItem > 0);
return code;
_err:
......@@ -235,7 +259,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
if (pMemTable->nDelOp == 0) {
if (pMemTable->nDel == 0) {
goto _exit;
}
......@@ -258,9 +282,11 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
}
_exit:
tsdbDebug("vgId:%d commit del data, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
return code;
_err:
tsdbError("vgId:%d failed to commit del data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -47,7 +47,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX};
pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1};
pMemTable->nRow = 0;
pMemTable->nDelOp = 0;
pMemTable->nDel = 0;
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
if (pMemTable->aTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -174,7 +174,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
// update the state of pMemTable and other (todo)
pMemTable->nDelOp++;
pMemTable->nDel++;
tsdbError("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s",
......
......@@ -109,7 +109,7 @@ struct SDelFReader {
TdFilePtr pReadH;
};
int32_t tsdbDelFReaderOpen(SDelFReader *pReader, SDelFile *pFile) {
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile) {
int32_t code = 0;
// TODO
return code;
......
......@@ -51,4 +51,26 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
}
return 0;
}
int32_t tPutSDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
int32_t n = 0;
n += tPutU32(p ? p + n : p, pDelIdx->delimiter);
n += tPutU8(p ? p + n : p, pDelIdx->flags);
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset);
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData);
return n;
}
int32_t tGetSDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
int32_t n = 0;
n += tGetU32(p + n, &pDelIdx->delimiter);
n += tGetU8(p + n, &pDelIdx->flags);
n += tGetBinary(p + n, &pDelIdx->pOffset, &pDelIdx->nOffset);
n += tGetBinary(p + n, &pDelIdx->pData, &pDelIdx->nData);
return n;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册