未验证 提交 31ccb016 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19336 from taosdata/fix/TS-2371

fix: skiplist concurrent access
...@@ -772,8 +772,8 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { ...@@ -772,8 +772,8 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) // #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) // #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
......
...@@ -19,9 +19,13 @@ ...@@ -19,9 +19,13 @@
#define SL_MAX_LEVEL 5 #define SL_MAX_LEVEL 5
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2 // sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4)) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_GET_NODE_FORWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_FORWARD(n, l)))
#define SL_GET_NODE_BACKWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_BACKWARD(n, l)))
#define SL_SET_NODE_FORWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_FORWARD(n, l), (int64_t)(p))
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_BACKWARD(n, l), (int64_t)(p))
#define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2 #define SL_MOVE_FROM_POS 0x2
...@@ -246,18 +250,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa ...@@ -246,18 +250,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
if (pFrom == NULL) { if (pFrom == NULL) {
// create from head or tail // create from head or tail
if (backward) { if (backward) {
pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0); pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
} else { } else {
pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0); pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
} }
} else { } else {
// create from a key // create from a key
if (backward) { if (backward) {
tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD); tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
pIter->pNode = SL_NODE_BACKWARD(pos[0], 0); pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
} else { } else {
tbDataMovePosTo(pTbData, pos, pFrom, 0); tbDataMovePosTo(pTbData, pos, pFrom, 0);
pIter->pNode = SL_NODE_FORWARD(pos[0], 0); pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
} }
} }
} }
...@@ -271,7 +275,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { ...@@ -271,7 +275,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return false; return false;
} }
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
if (pIter->pNode == pIter->pTbData->sl.pHead) { if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false; return false;
} }
...@@ -282,7 +286,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { ...@@ -282,7 +286,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return false; return false;
} }
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
if (pIter->pNode == pIter->pTbData->sl.pTail) { if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false; return false;
} }
...@@ -335,7 +339,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid ...@@ -335,7 +339,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
ASSERT(pPool != NULL); ASSERT(pPool != NULL);
pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
if (pTbData == NULL) { if (pTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -408,7 +412,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -408,7 +412,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
if (fromPos) px = pos[pTbData->sl.level - 1]; if (fromPos) px = pos[pTbData->sl.level - 1];
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel); pn = SL_GET_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) { while (pn != pTbData->sl.pHead) {
tKey.version = pn->version; tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts; tKey.ts = pn->pTSRow->ts;
...@@ -418,7 +422,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -418,7 +422,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
break; break;
} else { } else {
px = pn; px = pn;
pn = SL_NODE_BACKWARD(px, iLevel); pn = SL_GET_NODE_BACKWARD(px, iLevel);
} }
} }
...@@ -438,7 +442,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -438,7 +442,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
if (fromPos) px = pos[pTbData->sl.level - 1]; if (fromPos) px = pos[pTbData->sl.level - 1];
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel); pn = SL_GET_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) { while (pn != pTbData->sl.pTail) {
tKey.version = pn->version; tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts; tKey.ts = pn->pTSRow->ts;
...@@ -448,7 +452,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p ...@@ -448,7 +452,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
break; break;
} else { } else {
px = pn; px = pn;
pn = SL_NODE_FORWARD(px, iLevel); pn = SL_GET_NODE_FORWARD(px, iLevel);
} }
} }
...@@ -474,58 +478,53 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN ...@@ -474,58 +478,53 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
int8_t level; int8_t level;
SMemSkipListNode *pNode; SMemSkipListNode *pNode;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
int64_t nSize;
// node // create node
level = tsdbMemSkipListRandLevel(&pTbData->sl); level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL); nSize = SL_NODE_SIZE(level);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level)); pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->len);
if (pNode == NULL) { if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pNode->level = level; pNode->level = level;
pNode->version = version; pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len); pNode->pTSRow = (STSRow *)((char *)pNode + nSize);
if (NULL == pNode->pTSRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
memcpy(pNode->pTSRow, pRow, pRow->len); memcpy(pNode->pTSRow, pRow, pRow->len);
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { // set node
SMemSkipListNode *pn = pos[iLevel]; if (forward) {
SMemSkipListNode *px; for (int8_t iLevel = 0; iLevel < level; iLevel++) {
SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
if (forward) { SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
px = SL_NODE_FORWARD(pn, iLevel); }
} else {
SL_NODE_BACKWARD(pNode, iLevel) = pn; for (int8_t iLevel = 0; iLevel < level; iLevel++) {
SL_NODE_FORWARD(pNode, iLevel) = px; SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
} else { SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
px = SL_NODE_BACKWARD(pn, iLevel);
SL_NODE_BACKWARD(pNode, iLevel) = px;
SL_NODE_FORWARD(pNode, iLevel) = pn;
} }
} }
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { // set forward and backward
SMemSkipListNode *pn = pos[iLevel]; if (forward) {
SMemSkipListNode *px; for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
if (forward) { SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
px = SL_NODE_FORWARD(pn, iLevel); SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
SL_NODE_FORWARD(pn, iLevel) = pNode;
SL_NODE_BACKWARD(px, iLevel) = pNode;
} else {
px = SL_NODE_BACKWARD(pn, iLevel);
SL_NODE_FORWARD(px, iLevel) = pNode; pos[iLevel] = pNode;
SL_NODE_BACKWARD(pn, iLevel) = pNode;
} }
} else {
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
pos[iLevel] = pNode; pos[iLevel] = pNode;
}
} }
pTbData->sl.size++; pTbData->sl.size++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册