提交 f928765d 编写于 作者: H hjxilinx

[TD-32] opt skiplist insertion performance for ordered data

上级 6dfca928
...@@ -51,28 +51,27 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { ...@@ -51,28 +51,27 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
level = 1; level = 1;
pSkipList->level = 1; pSkipList->level = 1;
} else { } else {
if (level > pSkipList->level && pSkipList->level < pSkipList->maxLevel) { if (level > pSkipList->level) {
level = (++pSkipList->level); if (pSkipList->level < pSkipList->maxLevel) {
level = (++pSkipList->level);
} else {
level = pSkipList->level;
}
} }
} }
assert(level <= pSkipList->maxLevel);
return level; return level;
} }
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode); #define DO_MEMSET_PTR_AREA(n) do {\
int32_t _l = (n)->level;\
memset(pNode, 0, SL_NODE_HEADER_SIZE(_l));\
(n)->level = _l;\
} while(0)
void tSkipListDoRecordPut(SSkipList *pSkipList) { static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode);
#if SKIP_LIST_RECORD_PERFORMANCE static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode);
const int32_t MAX_RECORD_NUM = 1000;
if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) {
pSkipList->state.nInsertObjs = 1;
pSkipList->state.nTotalStepsForInsert = 0;
pSkipList->state.nTotalElapsedTimeForInsert = 0;
} else {
pSkipList->state.nInsertObjs++;
}
#endif
}
int32_t compareInt32Val(const void *pLeft, const void *pRight) { int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight); int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
...@@ -142,28 +141,12 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) { ...@@ -142,28 +141,12 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
} }
int32_t compareStrVal(const void *pLeft, const void *pRight) { int32_t compareStrVal(const void *pLeft, const void *pRight) {
// SSkipListKey *pL = (SSkipListKey *)pLeft; int32_t ret = strcmp(pLeft, pRight);
// SSkipListKey *pR = (SSkipListKey *)pRight; if (ret == 0) {
// return 0;
// if (pL->nLen == 0 && pR->nLen == 0) { } else {
// return 0; return ret > 0 ? 1 : -1;
// } }
//
// // handle only one-side bound compare situation, there is only lower bound or only upper bound
// if (pL->nLen == -1) {
// return 1; // no lower bound, lower bound is minimum, always return -1;
// } else if (pR->nLen == -1) {
// return -1; // no upper bound, upper bound is maximum situation, always return 1;
// }
//
// int32_t ret = strcmp(((SSkipListKey *)pLeft)->pz, ((SSkipListKey *)pRight)->pz);
//
// if (ret == 0) {
// return 0;
// } else {
// return ret > 0 ? 1 : -1;
// }
return 0;
} }
int32_t compareWStrVal(const void *pLeft, const void *pRight) { int32_t compareWStrVal(const void *pLeft, const void *pRight) {
...@@ -280,6 +263,28 @@ static __compar_fn_t getKeyComparator(int32_t keyType) { ...@@ -280,6 +263,28 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
return comparFn; return comparFn;
} }
static bool initForwardBackwardPtr(SSkipList* pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel;
// head info
pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel) * 2);
if (pSkipList->pHead == NULL) {
return false;
}
pSkipList->pHead->level = pSkipList->maxLevel;
// tail info
pSkipList->pTail = (SSkipListNode*) ((char*) pSkipList->pHead + SL_NODE_HEADER_SIZE(maxLevel));
pSkipList->pTail->level = pSkipList->maxLevel;
for(int32_t i = 0; i < maxLevel; ++i) {
SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pSkipList->pHead;
}
return true;
}
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock,
uint8_t freeNode, __sl_key_fn_t fn) { uint8_t freeNode, __sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList));
...@@ -291,21 +296,24 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui ...@@ -291,21 +296,24 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui
maxLevel = MAX_SKIP_LIST_LEVEL; maxLevel = MAX_SKIP_LIST_LEVEL;
} }
pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode}; pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode};
pSkipList->keyFn = fn; pSkipList->keyFn = fn;
pSkipList->comparFn = getKeyComparator(keyType); pSkipList->comparFn = getKeyComparator(keyType);
pSkipList->maxLevel = maxLevel; pSkipList->maxLevel = maxLevel;
pSkipList->level = 1; pSkipList->level = 1;
pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel)); if (!initForwardBackwardPtr(pSkipList)) {
pSkipList->pHead->level = pSkipList->maxLevel; tfree(pSkipList);
return NULL;
}
if (lock) { if (lock) {
pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t)); pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t));
if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) { if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) {
tfree(pSkipList->pHead); tfree(pSkipList->pHead);
tfree(pSkipList); tfree(pSkipList);
return NULL; return NULL;
} }
} }
...@@ -349,7 +357,7 @@ void *tSkipListDestroy(SSkipList *pSkipList) { ...@@ -349,7 +357,7 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0);
while (pNode) { while (pNode != pSkipList->pTail) {
SSkipListNode *pTemp = pNode; SSkipListNode *pTemp = pNode;
pNode = SL_GET_FORWARD_POINTER(pNode, 0); pNode = SL_GET_FORWARD_POINTER(pNode, 0);
...@@ -358,8 +366,6 @@ void *tSkipListDestroy(SSkipList *pSkipList) { ...@@ -358,8 +366,6 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
} }
} }
tfree(pSkipList->pHead);
if (pSkipList->lock) { if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock); pthread_rwlock_unlock(pSkipList->lock);
pthread_rwlock_destroy(pSkipList->lock); pthread_rwlock_destroy(pSkipList->lock);
...@@ -382,32 +388,34 @@ void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSi ...@@ -382,32 +388,34 @@ void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSi
} }
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
if (pSkipList == NULL) { if (pSkipList == NULL || pNode == NULL) {
return NULL; return NULL;
} }
if (pSkipList->lock) { if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
// record one node is put into skiplist // the new key is greater than the last key of skiplist append it at last position
tSkipListDoRecordPut(pSkipList); char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode);
if (pSkipList->size == 0 || pSkipList->comparFn(pSkipList->lastKey, newDatakey) < 0) {
return tSkipListDoAppend(pSkipList, pNode);
}
// find the appropriated position to insert data
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pHead;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool identical = false; bool identical = false;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) { for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i);
while (p != NULL) { while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p); char *key = SL_GET_NODE_KEY(pSkipList, p);
char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode);
// if the forward element is less than the specified key, forward one step // if the forward element is less than the specified key, forward one step
int32_t ret = pSkipList->comparFn(key, newDatakey); int32_t ret = pSkipList->comparFn(key, newDatakey);
if (ret < 0) { if (ret < 0) {
px = p; px = p;
p = SL_GET_FORWARD_POINTER(px, i); p = SL_GET_FORWARD_POINTER(px, i);
} else { } else {
if (identical == false) { if (identical == false) {
...@@ -417,10 +425,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -417,10 +425,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
break; break;
} }
} }
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalStepsForInsert++;
#endif
forward[i] = px; forward[i] = px;
} }
...@@ -432,51 +437,61 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -432,51 +437,61 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
return forward[0]; return forward[0];
} }
#if SKIP_LIST_RECORD_PERFORMANCE
recordNodeEachLevel(pSkipList, level);
#endif
// clear pointer area
int32_t level = SL_GET_NODE_LEVEL(pNode);
memset(pNode, 0, SL_NODE_HEADER_SIZE(pNode->level));
pNode->level = level;
tSkipListDoInsert(pSkipList, forward, level, pNode); tSkipListDoInsert(pSkipList, forward, pNode);
atomic_add_fetch_32(&pSkipList->size, 1);
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, level);
#endif
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pNode; return pNode;
} }
void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode) { void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
for (int32_t i = 0; i < level; ++i) { DO_MEMSET_PTR_AREA(pNode);
for (int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode *x = forward[i]; SSkipListNode *x = forward[i];
if (x != NULL) {
// if (x != pSkipList->pTail) {
SL_GET_BACKWARD_POINTER(pNode, i) = x; SL_GET_BACKWARD_POINTER(pNode, i) = x;
SSkipListNode *pForward = SL_GET_FORWARD_POINTER(x, i); SSkipListNode *next = SL_GET_FORWARD_POINTER(x, i);
if (pForward) { // if (next) {
SL_GET_BACKWARD_POINTER(pForward, i) = pNode; SL_GET_BACKWARD_POINTER(next, i) = pNode;
} // }
SL_GET_FORWARD_POINTER(pNode, i) = SL_GET_FORWARD_POINTER(x, i); SL_GET_FORWARD_POINTER(pNode, i) = next;
SL_GET_FORWARD_POINTER(x, i) = pNode; SL_GET_FORWARD_POINTER(x, i) = pNode;
} else { // } else {
SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; // SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
// SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); // }
} }
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
} }
} }
SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode) {
// do clear pointer area
DO_MEMSET_PTR_AREA(pNode);
for(int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pSkipList->pTail, i);
SL_GET_FORWARD_POINTER(prev, i) = pNode;
SL_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
SL_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
}
pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, pNode);
atomic_add_fetch_32(&pSkipList->size, 1);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return pNode;
}
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
int32_t sLevel = pSkipList->level - 1; int32_t sLevel = pSkipList->level - 1;
int32_t ret = -1; int32_t ret = -1;
...@@ -766,11 +781,18 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { ...@@ -766,11 +781,18 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1; int32_t id = 1;
char* prev = NULL;
while (p) {
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p); char *key = SL_GET_NODE_KEY(pSkipList, p);
if (prev != NULL) {
assert(pSkipList->comparFn(prev, key) < 0);
}
switch (pSkipList->keyInfo.type) { switch (pSkipList->keyInfo.type) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key);
break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
...@@ -786,7 +808,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { ...@@ -786,7 +808,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
fprintf(stdout, "\n"); fprintf(stdout, "\n");
} }
prev = SL_GET_NODE_KEY(pSkipList, p);
p = SL_GET_FORWARD_POINTER(p, nlevel - 1); p = SL_GET_FORWARD_POINTER(p, nlevel - 1);
// p = p->pForward[nlevel - 1];
} }
} }
...@@ -149,8 +149,8 @@ int main(int argc, char** argv) { ...@@ -149,8 +149,8 @@ int main(int argc, char** argv) {
} }
TEST(testCase, hashTest) { TEST(testCase, hashTest) {
simpleTest(); // simpleTest();
stringKeyTest(); // stringKeyTest();
noLockPerformanceTest(); // noLockPerformanceTest();
multithreadsTest(); // multithreadsTest();
} }
\ No newline at end of file
...@@ -69,6 +69,41 @@ void doubleSkipListTest() { ...@@ -69,6 +69,41 @@ void doubleSkipListTest() {
tSkipListDestroy(pSkipList); tSkipListDestroy(pSkipList);
} }
void randKeyTest() {
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), 0, false, true, getkey);
int32_t size = 200000;
srand(time(NULL));
printf("generated %d keys is: \n", size);
for (int32_t i = 0; i < size; ++i) {
int32_t level = 0;
int32_t s = 0;
tSkipListRandNodeInfo(pSkipList, &level, &s);
auto d = (SSkipListNode*)calloc(1, s + sizeof(int32_t) * 2);
d->level = level;
int32_t* key = (int32_t*)SL_GET_NODE_KEY(pSkipList, d);
key[0] = rand() % 1000000000;
key[1] = key[0];
tSkipListPut(pSkipList, d);
}
printf("the first level of skip list is:\n");
tSkipListPrint(pSkipList, 1);
printf("the sec level of skip list is:\n");
tSkipListPrint(pSkipList, 2);
printf("the 5 level of skip list is:\n");
tSkipListPrint(pSkipList, 5);
tSkipListDestroy(pSkipList);
}
void stringKeySkiplistTest() { void stringKeySkiplistTest() {
const int32_t max_key_size = 12; const int32_t max_key_size = 12;
...@@ -100,6 +135,9 @@ void stringKeySkiplistTest() { ...@@ -100,6 +135,9 @@ void stringKeySkiplistTest() {
tSkipListPut(pSkipList, pNode); tSkipListPut(pSkipList, pNode);
printf("level one------------------\n");
tSkipListPrint(pSkipList, 1);
#if 0 #if 0
SSkipListNode **pRes = NULL; SSkipListNode **pRes = NULL;
int32_t ret = tSkipListGets(pSkipList, &key1, &pRes); int32_t ret = tSkipListGets(pSkipList, &key1, &pRes);
...@@ -139,6 +177,9 @@ void stringKeySkiplistTest() { ...@@ -139,6 +177,9 @@ void stringKeySkiplistTest() {
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
printf("elapsed time:%lld us to insert %d data, avg:%f us\n", (e - s), total, (double)(e - s) / total); printf("elapsed time:%lld us to insert %d data, avg:%f us\n", (e - s), total, (double)(e - s) / total);
printf("level two------------------\n");
tSkipListPrint(pSkipList, 1);
#if 0 #if 0
SSkipListNode **pres = NULL; SSkipListNode **pres = NULL;
...@@ -167,7 +208,7 @@ void stringKeySkiplistTest() { ...@@ -167,7 +208,7 @@ void stringKeySkiplistTest() {
void skiplistPerformanceTest() { void skiplistPerformanceTest() {
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, false, getkey); SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, false, getkey);
int32_t size = 900000; int32_t size = 1000000;
int64_t prev = taosGetTimestampMs(); int64_t prev = taosGetTimestampMs();
int64_t s = prev; int64_t s = prev;
...@@ -179,7 +220,7 @@ void skiplistPerformanceTest() { ...@@ -179,7 +220,7 @@ void skiplistPerformanceTest() {
char* total = (char*)calloc(1, unit * size); char* total = (char*)calloc(1, unit * size);
char* p = total; char* p = total;
for (int32_t i = size; i > 0; --i) { for (int32_t i = 0; i < size; ++i) {
tSkipListRandNodeInfo(pSkipList, &level, &headsize); tSkipListRandNodeInfo(pSkipList, &level, &headsize);
SSkipListNode* d = (SSkipListNode*)p; SSkipListNode* d = (SSkipListNode*)p;
...@@ -207,19 +248,19 @@ void skiplistPerformanceTest() { ...@@ -207,19 +248,19 @@ void skiplistPerformanceTest() {
assert(tSkipListGetSize(pSkipList) == size); assert(tSkipListGetSize(pSkipList) == size);
printf("the level of skiplist is:\n"); // printf("the level of skiplist is:\n");
//
// printf("level two------------------\n"); // printf("level two------------------\n");
// tSkipListPrint(pSkipList, 2); // tSkipListPrint(pSkipList, 2);
// //
// printf("level three------------------\n"); // printf("level three------------------\n");
// tSkipListPrint(pSkipList, 3); // tSkipListPrint(pSkipList, 3);
// //
// printf("level four------------------\n"); // printf("level four------------------\n");
// tSkipListPrint(pSkipList, 4); // tSkipListPrint(pSkipList, 4);
// //
// printf("level nine------------------\n"); // printf("level nine------------------\n");
// tSkipListPrint(pSkipList, 10); // tSkipListPrint(pSkipList, 10);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
#if 0 #if 0
...@@ -275,10 +316,11 @@ TEST(testCase, skiplist_test) { ...@@ -275,10 +316,11 @@ TEST(testCase, skiplist_test) {
assert(sizeof(SSkipListKey) == 8); assert(sizeof(SSkipListKey) == 8);
srand(time(NULL)); srand(time(NULL));
// stringKeySkiplistTest(); stringKeySkiplistTest();
// doubleSkipListTest(); doubleSkipListTest();
skiplistPerformanceTest(); skiplistPerformanceTest();
// duplicatedKeyTest(); duplicatedKeyTest();
randKeyTest();
// tSKipListQueryCond q; // tSKipListQueryCond q;
// q.upperBndRelOptr = true; // q.upperBndRelOptr = true;
...@@ -310,5 +352,4 @@ TEST(testCase, skiplist_test) { ...@@ -310,5 +352,4 @@ TEST(testCase, skiplist_test) {
tfree(pNodes); tfree(pNodes);
free(pKeys);*/ free(pKeys);*/
getchar();
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册