提交 cc764032 编写于 作者: H Hongze Cheng

more work

上级 a26ccade
......@@ -22,10 +22,6 @@
extern "C" {
#endif
#define TSDB_OFFSET_U8 ((uint8_t)0x1)
#define TSDB_OFFSET_U16 ((uint8_t)0x2)
#define TSDB_OFFSET_U32 ((uint8_t)0x4)
// tsdbDebug ================
// clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
......@@ -48,6 +44,7 @@ typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
typedef struct SOffset SOffset;
// tsdbMemTable ==============================================================================================
......@@ -153,16 +150,16 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
// SDelIdx
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id);
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx);
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem);
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx);
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id);
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx);
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx);
// SDelData
int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version);
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx);
int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem);
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx);
int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version);
int32_t tPutDelData(uint8_t *p, SDelData *pDelData);
int32_t tGetDelData(uint8_t *p, SDelData *pDelData);
......@@ -170,6 +167,12 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile);
// structs
struct SOffset {
int32_t nOffset;
uint8_t flag;
uint8_t *pOffset;
};
typedef struct {
int minFid;
int midFid;
......@@ -358,9 +361,7 @@ struct SDelData {
uint32_t delimiter;
tb_uid_t suid;
tb_uid_t uid;
uint8_t flags;
uint32_t nItem;
uint8_t *pOffset;
SOffset offset;
uint32_t nData;
uint8_t *pData;
};
......@@ -378,9 +379,7 @@ struct SDelIdxItem {
struct SDelIdx {
uint32_t delimiter;
uint8_t flags;
uint32_t nItem;
uint8_t *pOffset;
SOffset offset;
uint32_t nData;
uint8_t *pData;
};
......
......@@ -15,28 +15,118 @@
#include "tsdb.h"
static FORCE_INLINE int32_t tsdbOffsetSize(uint8_t flags) {
if (flags & TSDB_OFFSET_U8) {
return sizeof(uint8_t);
} else if (flags & TSDB_OFFSET_U16) {
return sizeof(uint16_t);
} else if (flags & TSDB_OFFSET_U32) {
return sizeof(uint32_t);
} else {
ASSERT(0);
#define TSDB_OFFSET_I32 ((uint8_t)0)
#define TSDB_OFFSET_I16 ((uint8_t)1)
#define TSDB_OFFSET_I8 ((uint8_t)2)
static FORCE_INLINE int32_t tsdbOffsetSize(SOffset *pOfst) {
switch (pOfst->flag) {
case TSDB_OFFSET_I32:
return sizeof(int32_t);
case TSDB_OFFSET_I16:
return sizeof(int16_t);
case TSDB_OFFSET_I8:
return sizeof(int8_t);
default:
ASSERT(0);
}
}
static FORCE_INLINE uint32_t tsdbGetOffset(uint8_t *pOffset, uint8_t flags, int32_t idx) {
if (flags & TSDB_OFFSET_U8) {
return ((uint8_t *)pOffset)[idx];
} else if (flags & TSDB_OFFSET_U16) {
return ((uint16_t *)pOffset)[idx];
} else if (flags & TSDB_OFFSET_U32) {
return ((uint32_t *)pOffset)[idx];
static FORCE_INLINE int32_t tsdbGetOffset(SOffset *pOfst, int32_t idx) {
int32_t offset = -1;
if (idx >= 0 && idx < pOfst->nOffset) {
switch (pOfst->flag) {
case TSDB_OFFSET_I32:
offset = ((int32_t *)pOfst->pOffset)[idx];
break;
case TSDB_OFFSET_I16:
offset = ((int16_t *)pOfst->pOffset)[idx];
break;
case TSDB_OFFSET_I8:
offset = ((int8_t *)pOfst->pOffset)[idx];
break;
default:
ASSERT(0);
}
ASSERT(offset >= 0);
}
return offset;
}
static FORCE_INLINE int32_t tsdbAddOffset(SOffset *pOfst, int32_t offset) {
int32_t code = 0;
int32_t nOffset = pOfst->nOffset;
ASSERT(pOfst->flag == TSDB_OFFSET_I32);
ASSERT(offset >= 0);
pOfst->nOffset++;
// alloc
code = tsdbRealloc(&pOfst->pOffset, sizeof(int32_t) * pOfst->nOffset);
if (code) goto _exit;
// put
((int32_t *)pOfst->pOffset)[nOffset] = offset;
_exit:
return code;
}
static FORCE_INLINE int32_t tPutOffset(uint8_t *p, SOffset *pOfst) {
int32_t n = 0;
int32_t maxOffset;
ASSERT(pOfst->flag == TSDB_OFFSET_I32);
ASSERT(pOfst->nOffset > 0);
maxOffset = tsdbGetOffset(pOfst, pOfst->nOffset - 1);
n += tPutI32v(p ? p + n : p, pOfst->nOffset);
if (maxOffset <= INT8_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8);
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
n += tPutI8(p ? p + n : p, (int8_t)tsdbGetOffset(pOfst, iOffset));
}
} else if (maxOffset <= INT16_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16);
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
n += tPutI16(p ? p + n : p, (int16_t)tsdbGetOffset(pOfst, iOffset));
}
} else {
ASSERT(0);
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32);
for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) {
n += tPutI32(p ? p + n : p, (int32_t)tsdbGetOffset(pOfst, iOffset));
}
}
return n;
}
static FORCE_INLINE int32_t tGetOffset(uint8_t *p, SOffset *pOfst) {
int32_t n = 0;
n += tGetI32v(p + n, &pOfst->nOffset);
n += tGetU8(p + n, &pOfst->flag);
pOfst->pOffset = p + n;
switch (pOfst->flag) {
case TSDB_OFFSET_I32:
n = n + pOfst->nOffset + sizeof(int32_t);
break;
case TSDB_OFFSET_I16:
n = n + pOfst->nOffset + sizeof(int16_t);
break;
case TSDB_OFFSET_I8:
n = n + pOfst->nOffset + sizeof(int8_t);
break;
default:
ASSERT(0);
}
return n;
}
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size) {
......@@ -131,23 +221,59 @@ static FORCE_INLINE int32_t tPutDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem)
static FORCE_INLINE int32_t tGetDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
int32_t n = 0;
n += tGetI64(p, &pDelIdxItem->suid);
n += tGetI64(p, &pDelIdxItem->uid);
n += tGetI64(p, &pDelIdxItem->minKey);
n += tGetI64(p, &pDelIdxItem->maxKey);
n += tGetI64v(p, &pDelIdxItem->minVersion);
n += tGetI64v(p, &pDelIdxItem->maxVersion);
n += tGetI64v(p, &pDelIdxItem->offset);
n += tGetI64v(p, &pDelIdxItem->size);
n += tGetI64(p + n, &pDelIdxItem->suid);
n += tGetI64(p + n, &pDelIdxItem->uid);
n += tGetI64(p + n, &pDelIdxItem->minKey);
n += tGetI64(p + n, &pDelIdxItem->maxKey);
n += tGetI64v(p + n, &pDelIdxItem->minVersion);
n += tGetI64v(p + n, &pDelIdxItem->maxVersion);
n += tGetI64v(p + n, &pDelIdxItem->offset);
n += tGetI64v(p + n, &pDelIdxItem->size);
return n;
}
// SDelIdx ======================================================
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) {
int32_t code = 0;
uint32_t offset = pDelIdx->nData;
// offset
code = tsdbAddOffset(&pDelIdx->offset, offset);
if (code) goto _exit;
// alloc
pDelIdx->nData += tPutDelIdxItem(NULL, pItem);
code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData);
if (code) goto _exit;
// put
tPutDelIdxItem(pDelIdx->pData + offset, pItem);
_exit:
return code;
}
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) {
int32_t code = 0;
int32_t offset;
offset = tsdbGetOffset(&pDelIdx->offset, idx);
if (offset < 0) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
tGetDelIdxItem(pDelIdx->pData + offset, pItem);
_exit:
return code;
}
int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) {
int32_t code = 0;
int32_t lidx = 0;
int32_t ridx = pDelIdx->nItem - 1;
int32_t ridx = pDelIdx->offset.nOffset - 1;
int32_t midx;
uint64_t offset;
int32_t c;
......@@ -155,7 +281,9 @@ int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) {
while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
tDelIdxGetItemByIdx(pDelIdx, pItem, midx);
code = tDelIdxGetItemByIdx(pDelIdx, pItem, midx);
if (code) goto _exit;
c = tTABLEIDCmprFn(&id, pItem);
if (c == 0) {
goto _exit;
......@@ -172,41 +300,11 @@ _exit:
return code;
}
int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) {
tGetDelIdxItem(pDelIdx->pData + tsdbGetOffset(pDelIdx->pOffset, pDelIdx->flags, idx), pItem);
return 0;
}
int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) {
int32_t code = 0;
int32_t size = tPutDelIdxItem(NULL, pItem);
uint32_t offset = pDelIdx->nData;
uint32_t nItem = pDelIdx->nItem;
pDelIdx->nItem++;
pDelIdx->nData += size;
// alloc
code = tsdbRealloc(&pDelIdx->pOffset, pDelIdx->nItem * sizeof(uint32_t));
if (code) goto _exit;
code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData);
if (code) goto _exit;
// put
((uint32_t *)pDelIdx->pOffset)[nItem] = offset;
tPutDelIdxItem(pDelIdx->pData + offset, pItem);
_exit:
return code;
}
int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
int32_t n = 0;
n += tPutU32(p ? p + n : p, pDelIdx->delimiter);
n += tPutU8(p ? p + n : p, pDelIdx->flags);
n += tPutU32v(p ? p + n : p, pDelIdx->nItem);
n += tPutBinary(p ? p + n : p, pDelIdx->pOffset, pDelIdx->nItem * tsdbOffsetSize(pDelIdx->flags));
n += tPutOffset(p ? p + n : p, &pDelIdx->offset);
n += tPutBinary(p ? p + n : p, pDelIdx->pData, pDelIdx->nData);
return n;
......@@ -215,11 +313,9 @@ int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx) {
int32_t n = 0;
n += tGetU32(p, &pDelIdx->delimiter);
n += tGetU8(p, &pDelIdx->flags);
n += tGetU32v(p, &pDelIdx->nItem);
n += tGetBinary(p, &pDelIdx->pOffset, NULL);
n += tGetBinary(p, &pDelIdx->pData, &pDelIdx->nData);
n += tGetU32(p + n, &pDelIdx->delimiter);
n += tGetOffset(p + n, &pDelIdx->offset);
n += tGetBinary(p + n, &pDelIdx->pData, &pDelIdx->nData);
return n;
}
......@@ -238,24 +334,61 @@ static FORCE_INLINE int32_t tPutDelDataItem(uint8_t *p, SDelDataItem *pItem) {
static FORCE_INLINE int32_t tGetDelDataItem(uint8_t *p, SDelDataItem *pItem) {
int32_t n = 0;
n += tGetI64v(p, &pItem->version);
n += tGetI64(p, &pItem->sKey);
n += tGetI64(p, &pItem->eKey);
n += tGetI64v(p + n, &pItem->version);
n += tGetI64(p + n, &pItem->sKey);
n += tGetI64(p + n, &pItem->eKey);
return n;
}
// SDelData ======================================================
int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) {
int32_t code = 0;
uint32_t offset = pDelData->nData;
// offset
code = tsdbAddOffset(&pDelData->offset, offset);
if (code) goto _exit;
// alloc
pDelData->nData += tPutDelDataItem(NULL, pItem);
code = tsdbRealloc(&pDelData->pData, pDelData->nData);
if (code) goto _exit;
// put
tPutDelDataItem(pDelData->pData + offset, pItem);
_exit:
return code;
}
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx) {
int32_t code = 0;
int32_t offset;
offset = tsdbGetOffset(&pDelData->offset, idx);
if (offset < 0) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
tGetDelDataItem(pDelData->pData + offset, pItem);
_exit:
return code;
}
int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version) {
int32_t code = 0;
int32_t lidx = 0;
int32_t ridx = pDelData->nItem - 1;
int32_t ridx = pDelData->offset.nOffset - 1;
int32_t midx;
while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
tDelDataGetItemByIdx(pDelData, pItem, midx);
code = tDelDataGetItemByIdx(pDelData, pItem, midx);
if (code) goto _exit;
if (version == pItem->version) {
goto _exit;
} else if (version < pItem->version) {
......@@ -271,42 +404,13 @@ _exit:
return code;
}
int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx) {
tGetDelDataItem(pDelData->pData + tsdbGetOffset(pDelData->pOffset, pDelData->flags, idx), pItem);
return 0;
}
int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) {
int32_t code = 0;
uint32_t nItem = pDelData->nItem;
uint32_t offset = pDelData->nData;
pDelData->nItem++;
pDelData->nData += tPutDelDataItem(NULL, pItem);
// alloc
code = tsdbRealloc(&pDelData->pOffset, pDelData->nItem * sizeof(uint32_t));
if (code) goto _exit;
code = tsdbRealloc(&pDelData->pData, pDelData->nData);
if (code) goto _exit;
// put
((uint32_t *)pDelData->pOffset)[nItem] = offset;
tPutDelDataItem(pDelData->pData + offset, pItem);
_exit:
return code;
}
int32_t tPutDelData(uint8_t *p, SDelData *pDelData) {
int32_t n = 0;
n += tPutU32(p ? p + n : p, pDelData->delimiter);
n += tPutI64(p ? p + n : p, pDelData->suid);
n += tPutI64(p ? p + n : p, pDelData->uid);
n += tPutU8(p ? p + n : p, pDelData->flags);
n += tPutU32v(p ? p + n : p, pDelData->nItem);
n += tPutBinary(p ? p + n : p, pDelData->pOffset, pDelData->nItem * tsdbOffsetSize(pDelData->flags));
n += tPutOffset(p ? p + n : p, &pDelData->offset);
n += tPutBinary(p ? p + n : p, pDelData->pData, pDelData->nData);
return n;
......@@ -315,13 +419,11 @@ int32_t tPutDelData(uint8_t *p, SDelData *pDelData) {
int32_t tGetDelData(uint8_t *p, SDelData *pDelData) {
int32_t n = 0;
n += tGetU32(p, &pDelData->delimiter);
n += tGetI64(p, &pDelData->suid);
n += tGetI64(p, &pDelData->uid);
n += tGetU8(p, &pDelData->flags);
n += tGetU32v(p, &pDelData->nItem);
n += tGetBinary(p, &pDelData->pOffset, NULL);
n += tGetBinary(p, &pDelData->pData, &pDelData->nData);
n += tGetU32(p + n, &pDelData->delimiter);
n += tGetI64(p + n, &pDelData->suid);
n += tGetI64(p + n, &pDelData->uid);
n += tGetOffset(p + n, &pDelData->offset);
n += tGetBinary(p + n, &pDelData->pData, &pDelData->nData);
return n;
}
......@@ -342,12 +444,12 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) {
int32_t n = 0;
n += tGetI64(p, &pDelFile->minKey);
n += tGetI64(p, &pDelFile->maxKey);
n += tGetI64v(p, &pDelFile->minVersion);
n += tGetI64v(p, &pDelFile->maxVersion);
n += tGetI64v(p, &pDelFile->size);
n += tGetI64v(p, &pDelFile->offset);
n += tGetI64(p + n, &pDelFile->minKey);
n += tGetI64(p + n, &pDelFile->maxKey);
n += tGetI64v(p + n, &pDelFile->minVersion);
n += tGetI64v(p + n, &pDelFile->maxVersion);
n += tGetI64v(p + n, &pDelFile->size);
n += tGetI64v(p + n, &pDelFile->offset);
return n;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册