未验证 提交 fd6c41eb 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #1464 from taosdata/liaohj_2

Liaohj 2
...@@ -51,7 +51,7 @@ typedef struct SSkipListNode { ...@@ -51,7 +51,7 @@ typedef struct SSkipListNode {
#define SL_GET_BACKWARD_POINTER(n, _l) \ #define SL_GET_BACKWARD_POINTER(n, _l) \
((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)] ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)]
#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) #define SL_GET_NODE_DATA(n) ((char *)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) #define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n))
...@@ -106,7 +106,8 @@ typedef struct tSkipListState { ...@@ -106,7 +106,8 @@ typedef struct tSkipListState {
typedef struct SSkipListKeyInfo { typedef struct SSkipListKeyInfo {
uint8_t dupKey : 2; // if allow duplicated key in the skip list uint8_t dupKey : 2; // if allow duplicated key in the skip list
uint8_t type : 6; // key type uint8_t type : 4; // key type
uint8_t freeNode:2; // free node when destroy the skiplist
uint8_t len; // maximum key length, used in case of string key uint8_t len; // maximum key length, used in case of string key
} SSkipListKeyInfo; } SSkipListKeyInfo;
...@@ -117,14 +118,13 @@ typedef struct SSkipList { ...@@ -117,14 +118,13 @@ typedef struct SSkipList {
uint8_t maxLevel; uint8_t maxLevel;
uint8_t level; uint8_t level;
SSkipListKeyInfo keyInfo; SSkipListKeyInfo keyInfo;
pthread_rwlock_t *lock; pthread_rwlock_t *lock;
SSkipListNode * pHead; SSkipListNode * pHead; // point to the first element
SSkipListNode * pTail; // point to the last element
void * lastKey; // last key in the skiplist
#if SKIP_LIST_RECORD_PERFORMANCE #if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState state; // skiplist state tSkipListState state; // skiplist state
#endif #endif
} SSkipList; } SSkipList;
/* /*
...@@ -147,7 +147,7 @@ typedef struct SSkipListIterator { ...@@ -147,7 +147,7 @@ typedef struct SSkipListIterator {
* @return * @return
*/ */
SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe,
__sl_key_fn_t fn); uint8_t freeNode, __sl_key_fn_t fn);
/** /**
* *
...@@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); ...@@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
* @param keyType * @param keyType
* @return * @return
*/ */
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType);
/** /**
* get the size of skip list * get the size of skip list
* @param pSkipList * @param pSkipList
* @return * @return
*/ */
size_t tSkipListGetSize(const SSkipList* pSkipList); size_t tSkipListGetSize(const SSkipList *pSkipList);
/**
* display skip list of the given level, for debug purpose only
* @param pSkipList
* @param nlevel
*/
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
/** /**
* create skiplist iterator * create skiplist iterator
* @param pSkipList * @param pSkipList
* @return * @return
*/ */
SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
/** /**
* forward the skip list iterator * forward the skip list iterator
...@@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); ...@@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
* @param iter * @param iter
* @return * @return
*/ */
void* tSkipListDestroyIter(SSkipListIterator* iter); void *tSkipListDestroyIter(SSkipListIterator *iter);
/* /*
* remove only one node of the pKey value. * remove only one node of the pKey value.
...@@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); ...@@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
*/ */
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) { ...@@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
} }
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock,
__sl_key_fn_t fn) { uint8_t freeNode, __sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList));
if (pSkipList == NULL) { if (pSkipList == NULL) {
return NULL; return NULL;
...@@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui ...@@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui
maxLevel = MAX_SKIP_LIST_LEVEL; maxLevel = MAX_SKIP_LIST_LEVEL;
} }
pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey}; pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode};
pSkipList->keyFn = fn; pSkipList->keyFn = fn;
pSkipList->comparFn = getKeyComparator(keyType); pSkipList->comparFn = getKeyComparator(keyType);
pSkipList->maxLevel = maxLevel; pSkipList->maxLevel = maxLevel;
pSkipList->level = 1; pSkipList->level = 1;
...@@ -348,13 +347,16 @@ void *tSkipListDestroy(SSkipList *pSkipList) { ...@@ -348,13 +347,16 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); // pSkipList->pHead.pForward[0]; SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0);
while (pNode) { while (pNode) {
SSkipListNode *pTemp = pNode; SSkipListNode *pTemp = pNode;
pNode = SL_GET_FORWARD_POINTER(pNode, 0); pNode = SL_GET_FORWARD_POINTER(pNode, 0);
if (pSkipList->keyInfo.freeNode) {
tfree(pTemp); tfree(pTemp);
} }
}
tfree(pSkipList->pHead); tfree(pSkipList->pHead);
...@@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
recordNodeEachLevel(pSkipList, level); recordNodeEachLevel(pSkipList, level);
#endif #endif
// clear pointer area
int32_t level = SL_GET_NODE_LEVEL(pNode); int32_t level = SL_GET_NODE_LEVEL(pNode);
memset(pNode, 0, SL_NODE_HEADER_SIZE(pNode->level));
pNode->level = level;
tSkipListDoInsert(pSkipList, forward, level, pNode); tSkipListDoInsert(pSkipList, forward, level, pNode);
atomic_add_fetch_32(&pSkipList->size, 1); atomic_add_fetch_32(&pSkipList->size, 1);
...@@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { ...@@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) {
// return NULL; // return NULL;
//} //}
// //
// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
// void *param) {
// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
// if (NULL == *pRes) {
// pError("error skiplist %p, malloc failed", pSkipList);
// return -1;
// }
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
// int32_t num = 0;
//
// for (int32_t i = 0; i < pSkipList->nSize; ++i) {
// if (pStartNode == NULL) {
// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
//#ifdef _DEBUG_VIEW
// tSkipListPrint(pSkipList, 1);
//#endif
// break;
// }
//
// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
// (*pRes)[num++] = pStartNode;
// }
//
// pStartNode = pStartNode->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// if (num == 0) {
// free(*pRes);
// *pRes = NULL;
// } else if (num < pSkipList->nSize) { // free unused memory
// char *tmp = realloc((*pRes), num * POINTER_BYTES);
// assert(tmp != NULL);
//
// *pRes = (SSkipListNode **)tmp;
// }
//
// return num;
//}
//
// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) {
// if (pSkipList == NULL) {
// return -1;
// }
//
// iter->pSkipList = pSkipList;
// if (pSkipList->lock) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// }
// iter->cur = NULL; // pSkipList->pHead.pForward[0];
// iter->num = pSkipList->size;
//
// if (pSkipList->lock) {
// pthread_rwlock_unlock(&pSkipList->lock);
// }
//
// return 0;
//}
//
// bool tSkipListIteratorNext(SSkipListIterator *iter) {
// if (iter->num == 0 || iter->pSkipList == NULL) {
// return false;
// }
//
// SSkipList *pSkipList = iter->pSkipList;
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// if (iter->cur == NULL) {
// iter->cur = pSkipList->pHead.pForward[0];
// } else {
// iter->cur = iter->cur->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return iter->cur != NULL;
//}
//
// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; }
//
// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) { // int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) {
// pSkipList->state.queryCount++; // pSkipList->state.queryCount++;
// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); // SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
...@@ -841,6 +764,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { ...@@ -841,6 +764,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
} }
SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1; int32_t id = 1;
while (p) { while (p) {
......
...@@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) { ...@@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) {
} }
uint64_t endTime = taosGetTimestampUs(); uint64_t endTime = taosGetTimestampUs();
printf("add 10,000,000 object cost:%lld us, avg:%f us\n", endTime - startTime, (endTime-startTime)/(double)num); printf("add %d object cost:%lld us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num);
startTime = taosGetTimestampUs(); startTime = taosGetTimestampUs();
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
...@@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) { ...@@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) {
assert(k != 0); assert(k != 0);
} }
endTime = taosGetTimestampUs(); endTime = taosGetTimestampUs();
printf("retrieve 10,000,000 object cost:%lld us,avg:%f\n", endTime - startTime, (endTime - startTime)/(double)num); printf("retrieve %d object cost:%lld us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num);
taosCacheCleanup(pCache); taosCacheCleanup(pCache);
taosMsleep(20000); taosMsleep(20000);
......
#include <gtest/gtest.h>
#include <limits.h>
#include <taosdef.h>
#include <iostream>
#include "taosmsg.h"
#include "tskiplist.h"
#include "ttime.h"
#include "tutil.h"
namespace {
char* getkey(const void* data) { return (char*)(data); }
void doubleSkipListTest() {
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, true, getkey);
double doubleVal[1000] = {0};
int32_t size = 20000;
printf("generated %d keys is: \n", size);
for (int32_t i = 0; i < size; ++i) {
if (i < 1000) {
doubleVal[i] = i * 0.997;
}
int32_t level = 0;
int32_t size = 0;
tSkipListRandNodeInfo(pSkipList, &level, &size);
auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2);
d->level = level;
double* key = (double*)SL_GET_NODE_KEY(pSkipList, d);
key[0] = i * 0.997;
key[1] = i * 0.997;
tSkipListPut(pSkipList, d);
}
printf("the first level of skip list is:\n");
tSkipListPrint(pSkipList, 1);
#if 0
SSkipListNode **pNodes = NULL;
SSkipListKey sk;
for (int32_t i = 0; i < 100; ++i) {
sk.nType = TSDB_DATA_TYPE_DOUBLE;
int32_t idx = abs((i * rand()) % 1000);
sk.dKey = doubleVal[idx];
int32_t size = tSkipListGets(pSkipList, &sk, &pNodes);
printf("the query result size is: %d\n", size);
for (int32_t j = 0; j < size; ++j) {
printf("the result is: %lf\n", pNodes[j]->key.dKey);
}
if (size > 0) {
tfree(pNodes);
}
}
#endif
printf("double test end...\n");
tSkipListDestroy(pSkipList);
}
void stringKeySkiplistTest() {
const int32_t max_key_size = 12;
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, max_key_size, 0, false, true, getkey);
int32_t level = 0;
int32_t headsize = 0;
tSkipListRandNodeInfo(pSkipList, &level, &headsize);
auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double));
pNode->level = level;
char* d = SL_GET_NODE_DATA(pNode);
strncpy(d, "nyse", 5);
*(double*)(d + max_key_size) = 12;
tSkipListPut(pSkipList, pNode);
tSkipListRandNodeInfo(pSkipList, &level, &headsize);
pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double));
pNode->level = level;
d = SL_GET_NODE_DATA(pNode);
strncpy(d, "beijing", 8);
*(double*)(d + max_key_size) = 911;
tSkipListPut(pSkipList, pNode);
#if 0
SSkipListNode **pRes = NULL;
int32_t ret = tSkipListGets(pSkipList, &key1, &pRes);
assert(ret == 1);
assert(strcmp(pRes[0]->key.pz, "beijing") == 0);
assert(pRes[0]->key.nType == TSDB_DATA_TYPE_BINARY);
tSkipListDestroyKey(&key1);
tSkipListDestroyKey(&key);
tSkipListDestroy(pSkipList);
free(pRes);
#endif
tSkipListDestroy(pSkipList);
int64_t s = taosGetTimestampUs();
pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, 20, 0, false, true, getkey);
char k[256] = {0};
int32_t total = 10000;
for (int32_t i = 0; i < total; ++i) {
int32_t n = sprintf(k, "abc_%d_%d", i, i);
tSkipListRandNodeInfo(pSkipList, &level, &headsize);
auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double));
pNode->level = level;
char* d = SL_GET_NODE_DATA(pNode);
strncpy(d, k, strlen(k));
tSkipListPut(pSkipList, pNode);
}
int64_t e = taosGetTimestampUs();
printf("elapsed time:%lld us to insert %d data, avg:%f us\n", (e - s), total, (double)(e - s) / total);
#if 0
SSkipListNode **pres = NULL;
s = taosGetTimestampMs();
for (int32_t j = 0; j < total; ++j) {
int32_t n = sprintf(k, "abc_%d_%d", j, j);
key = tSkipListCreateKey(TSDB_DATA_TYPE_BINARY, k, n);
int32_t num = tSkipListGets(pSkipList, &key, &pres);
assert(num > 0);
// tSkipListRemove(pSkipList, &key);
tSkipListRemoveNode(pSkipList, pres[0]);
if (num > 0) {
tfree(pres);
}
}
e = taosGetTimestampMs();
printf("elapsed time:%lldms\n", e - s);
#endif
tSkipListDestroy(pSkipList);
}
void skiplistPerformanceTest() {
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, false, getkey);
int32_t size = 900000;
int64_t prev = taosGetTimestampMs();
int64_t s = prev;
int32_t level = 0;
int32_t headsize = 0;
int32_t unit = MAX_SKIP_LIST_LEVEL * POINTER_BYTES * 2 + sizeof(double) * 2 + sizeof(int16_t);
char* total = (char*)calloc(1, unit * size);
char* p = total;
for (int32_t i = size; i > 0; --i) {
tSkipListRandNodeInfo(pSkipList, &level, &headsize);
SSkipListNode* d = (SSkipListNode*)p;
p += headsize + sizeof(double) * 2;
d->level = level;
double* v = (double*)SL_GET_NODE_DATA(d);
v[0] = i * 0.997;
v[1] = i * 0.997;
tSkipListPut(pSkipList, d);
if (i % 100000 == 0) {
int64_t cur = taosGetTimestampMs();
int64_t elapsed = cur - prev;
printf("add %d, elapsed time: %lld ms, avg elapsed:%f ms, total:%d\n", 100000, elapsed, elapsed / 100000.0, i);
prev = cur;
}
}
int64_t e = taosGetTimestampMs();
printf("total:%lld ms, avg:%f\n", e - s, (e - s) / (double)size);
printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level);
assert(tSkipListGetSize(pSkipList) == size);
printf("the level of skiplist is:\n");
// printf("level two------------------\n");
// tSkipListPrint(pSkipList, 2);
//
// printf("level three------------------\n");
// tSkipListPrint(pSkipList, 3);
//
// printf("level four------------------\n");
// tSkipListPrint(pSkipList, 4);
//
// printf("level nine------------------\n");
// tSkipListPrint(pSkipList, 10);
int64_t st = taosGetTimestampMs();
#if 0
for (int32_t i = 0; i < 100000; i += 1) {
key.dKey = i * 0.997;
tSkipListRemove(pSkipList, &key);
}
#endif
int64_t et = taosGetTimestampMs();
printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st);
assert(tSkipListGetSize(pSkipList) == size);
tSkipListDestroy(pSkipList);
tfree(total);
}
// todo not support duplicated key yet
void duplicatedKeyTest() {
#if 0
SSkipListKey key;
key.nType = TSDB_DATA_TYPE_INT;
SSkipListNode **pNodes = NULL;
SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int));
for (int32_t i = 0; i < 10000; ++i) {
for (int32_t j = 0; j < 5; ++j) {
key.i64Key = i;
tSkipListPut(pSkipList, "", &key, 1);
}
}
tSkipListPrint(pSkipList, 1);
for (int32_t i = 0; i < 100; ++i) {
key.i64Key = rand() % 1000;
int32_t size = tSkipListGets(pSkipList, &key, &pNodes);
assert(size == 5);
tfree(pNodes);
}
tSkipListDestroy(pSkipList);
#endif
}
} // namespace
TEST(testCase, skiplist_test) {
assert(sizeof(SSkipListKey) == 8);
srand(time(NULL));
// stringKeySkiplistTest();
// doubleSkipListTest();
skiplistPerformanceTest();
// duplicatedKeyTest();
// tSKipListQueryCond q;
// q.upperBndRelOptr = true;
// q.lowerBndRelOptr = true;
// q.upperBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q.lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q.lowerBnd.dKey = 120;
// q.upperBnd.dKey = 171.989;
/*
int32_t size = tSkipListQuery(pSkipList, &q, &pNodes);
for (int32_t i = 0; i < size; ++i) {
printf("-----%lf\n", pNodes[i]->key.dKey);
}
printf("the range query result size is: %d\n", size);
tfree(pNodes);
SSkipListKey *pKeys = malloc(sizeof(SSkipListKey) * 20);
for (int32_t i = 0; i < 8; i += 2) {
pKeys[i].dKey = i * 0.997;
pKeys[i].nType = TSDB_DATA_TYPE_DOUBLE;
printf("%lf ", pKeys[i].dKey);
}
int32_t r = tSkipListPointQuery(pSkipList, pKeys, 8, EXCLUDE_POINT_QUERY, &pNodes);
printf("\nthe exclude query result is: %d\n", r);
for (int32_t i = 0; i < r; ++i) {
// printf("%lf ", pNodes[i]->key.dKey);
}
tfree(pNodes);
free(pKeys);*/
getchar();
}
...@@ -719,7 +719,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -719,7 +719,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if (pTable->mem == NULL) { if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1; if (pTable->mem == NULL) return -1;
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey);
pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyFirst = INT64_MAX;
pTable->mem->keyLast = 0; pTable->mem->keyLast = 0;
} }
...@@ -742,7 +742,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -742,7 +742,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if (pTable->mem == NULL) { if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1; if (pTable->mem == NULL) return -1;
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey);
pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyFirst = INT64_MAX;
pTable->mem->keyLast = 0; pTable->mem->keyLast = 0;
} }
...@@ -1162,7 +1162,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { ...@@ -1162,7 +1162,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
return 0; return 0;
} }
static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
SCompData *pCompData = NULL; SCompData *pCompData = NULL;
SFile * pFile = NULL; SFile * pFile = NULL;
......
...@@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
pTable->pIndex = pTable->pIndex =
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, 0, getTupleKey);
} }
tsdbAddTableToMeta(pMeta, pTable, false); tsdbAddTableToMeta(pMeta, pTable, false);
...@@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagSchema = tdDupSchema(pCfg->tagSchema);
super->tagVal = tdDataRowDup(pCfg->tagValues); super->tagVal = tdDataRowDup(pCfg->tagValues);
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
0, getTupleKey); // Allow duplicate key, no lock 0, 0, getTupleKey); // Allow duplicate key, no lock
if (super->pIndex == NULL) { if (super->pIndex == NULL) {
tdFreeSchema(super->schema); tdFreeSchema(super->schema);
......
...@@ -425,7 +425,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { ...@@ -425,7 +425,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
// temporarily keep the position value, in case of no data qualified when move forwards(backwards) // temporarily keep the position value, in case of no data qualified when move forwards(backwards)
SQueryFilePos save = pQueryHandle->cur; SQueryFilePos save = pQueryHandle->cur;
// fileIndex = getNextDataFileCompInfo_(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step); // fileIndex = getNextDataFileCompInfo(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step);
// first data block in the next file // first data block in the next file
if (fileIndex >= 0) { if (fileIndex >= 0) {
...@@ -441,7 +441,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { ...@@ -441,7 +441,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
// if (nextTimestamp < 0) { // if (nextTimestamp < 0) {
// pQueryHandle->cur = save; // pQueryHandle->cur = save;
// } // }
//
// return (nextTimestamp > 0); // return (nextTimestamp > 0);
} }
...@@ -460,11 +460,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { ...@@ -460,11 +460,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
// return loadQaulifiedData(pQueryHandle); // return loadQaulifiedData(pQueryHandle);
} else { // data in cache } else { // data in cache
// todo continue; return hasMoreDataInCacheForSingleModel(pQueryHandle);
} }
} }
int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) { int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfPoints; int numOfPoints;
...@@ -579,8 +578,6 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) { ...@@ -579,8 +578,6 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) {
} }
} }
assert(pQueryHandle->realNumOfRows <= blockInfo.size); assert(pQueryHandle->realNumOfRows <= blockInfo.size);
// forward(backward) the position for cursor // forward(backward) the position for cursor
...@@ -592,7 +589,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -592,7 +589,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid);
pCheckInfo->checkFirstFileBlock = true; if (fileGroup == NULL) {
return false;
}
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
...@@ -660,34 +659,28 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -660,34 +659,28 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
SQueryFilePos* cur = &pHandle->cur; SQueryFilePos* cur = &pHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
if (!pCheckInfo->checkFirstFileBlock && pFileHandle != NULL) { if (!pCheckInfo->checkFirstFileBlock) {
int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid);
pCheckInfo->checkFirstFileBlock = true; pCheckInfo->checkFirstFileBlock = true;
if (fileGroup != NULL) { if (pFileHandle != NULL) {
return getQualifiedDataBlock(pHandle, pCheckInfo, 1); bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1);
} else { // no data in file, try cache if (found) {
return hasMoreDataInCacheForSingleModel(pHandle); return true;
} }
} else {
pCheckInfo->checkFirstFileBlock = true;
if (pFileHandle == NULL) {
cur->fid = -1;
} }
if (cur->fid == -1 || pFileHandle != NULL) { // try data in cache // no data in file, try cache
pHandle->cur.fid = -1;
return hasMoreDataInCacheForSingleModel(pHandle); return hasMoreDataInCacheForSingleModel(pHandle);
} else { } else { // move to next data block in file or in cache
return true; return moveToNextBlock(pHandle, 1);
} }
}
} }
static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册