提交 4e2c2b80 编写于 作者: H Hongze Cheng

more work

上级 edb3c069
...@@ -71,6 +71,7 @@ int32_t* taosGetErrno(); ...@@ -71,6 +71,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029) #define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -22,6 +22,10 @@ ...@@ -22,6 +22,10 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_OFFSET_U8 ((uint8_t)0x1)
#define TSDB_OFFSET_U16 ((uint8_t)0x2)
#define TSDB_OFFSET_U32 ((uint8_t)0x4)
// tsdbDebug ================ // tsdbDebug ================
// clang-format off // clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
...@@ -150,7 +154,8 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2); ...@@ -150,7 +154,8 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
// SDelIdx // SDelIdx
int32_t tDelIdxGetSize(SDelIdx *pDelIdx); int32_t tDelIdxGetSize(SDelIdx *pDelIdx);
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, int32_t idx, SDelIdxItem *pItem); int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id);
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx);
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem); int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem);
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx);
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx);
...@@ -375,7 +380,7 @@ struct SDelIdxItem { ...@@ -375,7 +380,7 @@ struct SDelIdxItem {
struct SDelIdx { struct SDelIdx {
uint32_t delimiter; uint32_t delimiter;
uint8_t flags; uint8_t flags;
uint32_t nOffset; uint32_t nItem;
uint8_t *pOffset; uint8_t *pOffset;
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
......
...@@ -42,10 +42,10 @@ struct SCommitter { ...@@ -42,10 +42,10 @@ struct SCommitter {
/* commit del */ /* commit del */
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
SDelFWriter *pDelFWriter; SDelFWriter *pDelFWriter;
SDelIdx oDelIdx; SDelIdx delIdxOld;
SDelIdx nDelIdx; SDelIdx delIdxNew;
SDelData oDelData; SDelData delDataOld;
SDelData nDelData; SDelData delDataNew;
SDelIdxItem delIdxItem; SDelIdxItem delIdxItem;
/* commit cache */ /* commit cache */
}; };
...@@ -175,21 +175,21 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { ...@@ -175,21 +175,21 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
SDelFile *pDelFileW = NULL; // TODO SDelFile *pDelFileW = NULL; // TODO
// load old // load old
pCommitter->oDelIdx = (SDelIdx){0}; pCommitter->delIdxOld = (SDelIdx){0};
if (pDelFileR) { if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
if (code) { if (code) {
goto _err; goto _err;
} }
code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdx, &pCommitter->pBuf1); code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->delIdxOld, &pCommitter->pBuf1);
if (code) { if (code) {
goto _err; goto _err;
} }
} }
// prepare new // prepare new
pCommitter->nDelIdx = (SDelIdx){0}; pCommitter->delIdxNew = (SDelIdx){0};
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
if (code) { if (code) {
goto _err; goto _err;
...@@ -245,7 +245,7 @@ _err: ...@@ -245,7 +245,7 @@ _err:
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdx, NULL); code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->delIdxNew, NULL);
if (code) { if (code) {
goto _err; goto _err;
} }
...@@ -304,7 +304,7 @@ _exit: ...@@ -304,7 +304,7 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d commit del data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -543,14 +543,14 @@ static int32_t tsdbCommitTableDelStart(SCommitter *pCommitter) { ...@@ -543,14 +543,14 @@ static int32_t tsdbCommitTableDelStart(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
// load old // load old
pCommitter->oDelData = (SDelData){0}; pCommitter->delDataOld = (SDelData){0};
if (0) { if (0) {
code = tsdbReadDelData(pCommitter->pDelFReader, NULL /*TODO*/, &pCommitter->oDelData, &pCommitter->pBuf4); code = tsdbReadDelData(pCommitter->pDelFReader, NULL /*TODO*/, &pCommitter->delDataOld, &pCommitter->pBuf4);
if (code) goto _err; if (code) goto _err;
} }
// prepare new // prepare new
pCommitter->nDelData = (SDelData){0}; pCommitter->delDataNew = (SDelData){0};
return code; return code;
...@@ -566,7 +566,7 @@ static int32_t tsdbCommitTableDelImpl(SCommitter *pCommitter) { ...@@ -566,7 +566,7 @@ static int32_t tsdbCommitTableDelImpl(SCommitter *pCommitter) {
for (; pDelOp; pDelOp = pDelOp->pNext) { for (; pDelOp; pDelOp = pDelOp->pNext) {
SDelDataItem item = {.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey}; SDelDataItem item = {.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey};
code = tDelDataPutItem(&pCommitter->nDelData, &item); code = tDelDataPutItem(&pCommitter->delDataNew, &item);
if (code) goto _err; if (code) goto _err;
} }
...@@ -580,11 +580,11 @@ static int32_t tsdbCommitTableDelEnd(SCommitter *pCommitter) { ...@@ -580,11 +580,11 @@ static int32_t tsdbCommitTableDelEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
// write table del data // write table del data
code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelData, NULL); code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->delDataNew, NULL);
if (code) goto _err; if (code) goto _err;
// add SDelIdxItem // add SDelIdxItem
code = tDelIdxPutItem(&pCommitter->nDelIdx, &pCommitter->delIdxItem); code = tDelIdxPutItem(&pCommitter->delIdxNew, &pCommitter->delIdxItem);
if (code) goto _err; if (code) goto _err;
return code; return code;
......
...@@ -410,7 +410,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf) ...@@ -410,7 +410,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SDelIdx *pDelIdx, uint8_t **ppBuf)
int32_t n = tGetDelIdx(*ppBuf, pDelIdx); int32_t n = tGetDelIdx(*ppBuf, pDelIdx);
ASSERT(n == size - sizeof(TSCKSUM)); ASSERT(n == size - sizeof(TSCKSUM));
ASSERT(pDelIdx->delimiter == TSDB_FILE_DLMT); ASSERT(pDelIdx->delimiter == TSDB_FILE_DLMT);
ASSERT(pDelIdx->nOffset > 0 && pDelIdx->nData > 0); // ASSERT(pDelIdx->nOffset > 0 && pDelIdx->nData > 0);
return code; return code;
......
...@@ -15,6 +15,30 @@ ...@@ -15,6 +15,30 @@
#include "tsdb.h" #include "tsdb.h"
static FORCE_INLINE int32_t tsdbOffsetSize(uint8_t flags) {
if (flags & TSDB_OFFSET_U8) {
return sizeof(uint8_t);
} else if (flags & TSDB_OFFSET_U16) {
return sizeof(uint16_t);
} else if (flags & TSDB_OFFSET_U32) {
return sizeof(uint32_t);
} else {
ASSERT(0);
}
}
static FORCE_INLINE uint32_t tsdbGetOffset(uint8_t *pOffset, uint8_t flags, int32_t idx) {
if (flags & TSDB_OFFSET_U8) {
return ((uint8_t *)pOffset)[idx];
} else if (flags & TSDB_OFFSET_U16) {
return ((uint16_t *)pOffset)[idx];
} else if (flags & TSDB_OFFSET_U32) {
return ((uint32_t *)pOffset)[idx];
} else {
ASSERT(0);
}
}
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size) { int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size) {
int32_t code = 0; int32_t code = 0;
int64_t bsize = 0; int64_t bsize = 0;
...@@ -88,6 +112,37 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { ...@@ -88,6 +112,37 @@ int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
// SDelIdxItem ======================================================
static FORCE_INLINE int32_t tPutDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pDelIdxItem->suid);
n += tPutI64(p ? p + n : p, pDelIdxItem->uid);
n += tPutI64(p ? p + n : p, pDelIdxItem->minKey);
n += tPutI64(p ? p + n : p, pDelIdxItem->maxKey);
n += tPutI64v(p ? p + n : p, pDelIdxItem->minVersion);
n += tPutI64v(p ? p + n : p, pDelIdxItem->maxVersion);
n += tPutI64v(p ? p + n : p, pDelIdxItem->offset);
n += tPutI64v(p ? p + n : p, pDelIdxItem->size);
return n;
}
static FORCE_INLINE int32_t tGetDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
int32_t n = 0;
n += tGetI64(p, &pDelIdxItem->suid);
n += tGetI64(p, &pDelIdxItem->uid);
n += tGetI64(p, &pDelIdxItem->minKey);
n += tGetI64(p, &pDelIdxItem->maxKey);
n += tGetI64v(p, &pDelIdxItem->minVersion);
n += tGetI64v(p, &pDelIdxItem->maxVersion);
n += tGetI64v(p, &pDelIdxItem->offset);
n += tGetI64v(p, &pDelIdxItem->size);
return n;
}
// SDelIdx ====================================================== // SDelIdx ======================================================
int32_t tDelIdxGetSize(SDelIdx *pDelIdx) { int32_t tDelIdxGetSize(SDelIdx *pDelIdx) {
int32_t code = 0; int32_t code = 0;
...@@ -95,15 +150,62 @@ int32_t tDelIdxGetSize(SDelIdx *pDelIdx) { ...@@ -95,15 +150,62 @@ int32_t tDelIdxGetSize(SDelIdx *pDelIdx) {
return code; return code;
} }
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, int32_t idx, SDelIdxItem *pItem) { int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) {
int32_t code = 0; int32_t code = 0;
// TODO int32_t lidx = 0;
int32_t ridx = pDelIdx->nItem - 1;
int32_t midx;
uint64_t offset;
int32_t c;
while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
tDelIdxGetItemByIdx(pDelIdx, pItem, midx);
c = tTABLEIDCmprFn(&id, pItem);
if (c == 0) {
goto _exit;
} else if (c < 0) {
ridx = midx - 1;
} else {
lidx = midx + 1;
}
}
code = TSDB_CODE_NOT_FOUND;
_exit:
return code;
}
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) {
int32_t code = 0;
tGetDelIdxItem(pDelIdx->pData + tsdbGetOffset(pDelIdx->pOffset, pDelIdx->flags, idx), pItem);
return code; return code;
} }
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) { int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) {
int32_t code = 0; int32_t code = 0;
// TODO int32_t size = tPutDelIdxItem(NULL, pItem);
uint32_t offset = pDelIdx->nData;
uint32_t nItem = pDelIdx->nItem;
pDelIdx->nItem++;
pDelIdx->nData += size;
// alloc
code = tsdbRealloc(&pDelIdx->pOffset, pDelIdx->nItem * sizeof(uint32_t));
if (code) goto _exit;
code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData);
if (code) goto _exit;
// put
((uint32_t *)pDelIdx->pOffset)[nItem] = offset;
tPutDelIdxItem(pDelIdx->pData + offset, pItem);
_exit:
return code; return code;
} }
...@@ -112,7 +214,8 @@ int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) { ...@@ -112,7 +214,8 @@ int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
n += tPutU32(p ? p + n : p, pDelIdx->delimiter); n += tPutU32(p ? p + n : p, pDelIdx->delimiter);
n += tPutU8(p ? p + n : p, pDelIdx->flags); n += tPutU8(p ? p + n : p, pDelIdx->flags);
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nOffset); n += tPutU32v(p ? p + n : p, pDelIdx->nItem);
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nItem * tsdbOffsetSize(pDelIdx->flags));
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData); n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData);
return n; return n;
...@@ -123,7 +226,8 @@ int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) { ...@@ -123,7 +226,8 @@ int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
n += tGetU32(p, &pDelIdx->delimiter); n += tGetU32(p, &pDelIdx->delimiter);
n += tGetU8(p, &pDelIdx->flags); n += tGetU8(p, &pDelIdx->flags);
n += tGetBinary(p, &pDelIdx->pOffset, &pDelIdx->nOffset); n += tGetU32v(p, &pDelIdx->nItem);
n += tGetBinary(p, &pDelIdx->pOffset, NULL);
n += tGetBinary(p, &pDelIdx->pData, &pDelIdx->nData); n += tGetBinary(p, &pDelIdx->pData, &pDelIdx->nData);
return n; return n;
......
...@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") ...@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册