提交 5b51f97a 编写于 作者: H Hongze Cheng

more optimize

上级 3bc41d37
...@@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { ...@@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)}) #define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}) #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}) #define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); // int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter // SRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
...@@ -333,6 +333,8 @@ struct SVersionRange { ...@@ -333,6 +333,8 @@ struct SVersionRange {
typedef struct SMemSkipListNode SMemSkipListNode; typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode { struct SMemSkipListNode {
int8_t level; int8_t level;
int64_t version;
STSRow *pTSRow;
SMemSkipListNode *forwards[0]; SMemSkipListNode *forwards[0];
}; };
typedef struct SMemSkipList { typedef struct SMemSkipList {
...@@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { ...@@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = tGetI64(p, &pRow->version);
pRow->pTSRow = (STSRow *)(p + n);
n += pRow->pTSRow->len;
return n;
}
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
...@@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { ...@@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
} }
} }
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
pIter->pRow = &pIter->row; pIter->pRow = &pIter->row;
pIter->pRow->version = pIter->pNode->version;
pIter->pRow->pTSRow = pIter->pNode->pTSRow;
return pIter->pRow; return pIter->pRow;
} }
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2 #define SL_MOVE_FROM_POS 0x2
...@@ -394,7 +393,7 @@ _err: ...@@ -394,7 +393,7 @@ _err:
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
SMemSkipListNode *px; SMemSkipListNode *px;
SMemSkipListNode *pn; SMemSkipListNode *pn;
TSDBKEY *pTKey; TSDBKEY tKey = {0};
int32_t backward = flags & SL_MOVE_BACKWARD; int32_t backward = flags & SL_MOVE_BACKWARD;
int32_t fromPos = flags & SL_MOVE_FROM_POS; int32_t fromPos = flags & SL_MOVE_FROM_POS;
...@@ -413,9 +412,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -413,9 +412,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel); pn = SL_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) { while (pn != pTbData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn); tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(pTKey, pKey); int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c <= 0) { if (c <= 0) {
break; break;
} else { } else {
...@@ -442,7 +442,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -442,7 +442,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel); pn = SL_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) { while (pn != pTbData->sl.pTail) {
int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey); tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c >= 0) { if (c >= 0) {
break; break;
} else { } else {
...@@ -467,8 +470,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { ...@@ -467,8 +470,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level; return level;
} }
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow, static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version,
int8_t forward) { STSRow *pRow, int8_t forward) {
int32_t code = 0; int32_t code = 0;
int8_t level; int8_t level;
SMemSkipListNode *pNode; SMemSkipListNode *pNode;
...@@ -477,13 +480,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN ...@@ -477,13 +480,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
// node // node
level = tsdbMemSkipListRandLevel(&pTbData->sl); level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL); ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
if (pNode == NULL) { if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pNode->level = level; pNode->level = level;
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow); pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
if (NULL == pNode->pTSRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
memcpy(pNode->pTSRow, pRow, pRow->len);
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel]; SMemSkipListNode *pn = pos[iLevel];
...@@ -549,7 +558,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -549,7 +558,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
key.ts = row.pTSRow->ts; key.ts = row.pTSRow->ts;
nRow++; nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0); code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
if (code) { if (code) {
goto _err; goto _err;
} }
...@@ -570,7 +579,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -570,7 +579,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
} }
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
if (code) { if (code) {
goto _err; goto _err;
} }
......
...@@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * ...@@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
} }
} }
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { // int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0; // int32_t n = 0;
n += tPutI64(p, pRow->version); // n += tPutI64(p, pRow->version);
if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); // if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
n += pRow->pTSRow->len; // n += pRow->pTSRow->len;
return n; // return n;
} // }
int32_t tsdbRowCmprFn(const void *p1, const void *p2) { int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
...@@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
...@@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE; cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) { if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData); cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData); cv.value.pData = varDataVal(pData);
} else { } else {
...@@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch ...@@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch
ASSERT(pTColumn->type == pColData->type); ASSERT(pTColumn->type == pColData->type);
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
TDRowValT vt = TD_VTYPE_NONE; // default is NONE TDRowValT vt = TD_VTYPE_NONE; // default is NONE
SKvRowIdx *pKvIdx = NULL; SKvRowIdx *pKvIdx = NULL;
while (kvIter < nKvCols) { while (kvIter < nKvCols) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册