From 18a63bd3f10cf6cd319977f004f07ef350683b52 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 30 Mar 2020 14:43:01 +0800 Subject: [PATCH] [TD-32] merge 2.0 branch --- src/util/inc/tskiplist.h | 42 +++-- src/util/src/tskiplist.c | 102 ++--------- src/util/tests/cacheTest.cpp | 4 +- src/util/tests/hashTest.cpp | 2 +- src/util/tests/skiplistTest.cpp | 314 ++++++++++++++++++++++++++++++++ src/vnode/tsdb/src/tsdbMain.c | 6 +- src/vnode/tsdb/src/tsdbMeta.c | 4 +- src/vnode/tsdb/src/tsdbRead.c | 47 ++--- 8 files changed, 379 insertions(+), 142 deletions(-) create mode 100644 src/util/tests/skiplistTest.cpp diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 0ec1d0eab5..18404f89c2 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -51,7 +51,7 @@ typedef struct SSkipListNode { #define SL_GET_BACKWARD_POINTER(n, _l) \ ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)] -#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) +#define SL_GET_NODE_DATA(n) ((char *)(n) + SL_NODE_HEADER_SIZE((n)->level)) #define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) @@ -106,25 +106,25 @@ typedef struct tSkipListState { typedef struct SSkipListKeyInfo { uint8_t dupKey : 2; // if allow duplicated key in the skip list - uint8_t type : 6; // key type + uint8_t type : 4; // key type + uint8_t freeNode:2; // free node when destroy the skiplist uint8_t len; // maximum key length, used in case of string key } SSkipListKeyInfo; typedef struct SSkipList { - __compar_fn_t comparFn; - __sl_key_fn_t keyFn; - uint32_t size; - uint8_t maxLevel; - uint8_t level; - SSkipListKeyInfo keyInfo; - + __compar_fn_t comparFn; + __sl_key_fn_t keyFn; + uint32_t size; + uint8_t maxLevel; + uint8_t level; + SSkipListKeyInfo keyInfo; pthread_rwlock_t *lock; - SSkipListNode * pHead; - + SSkipListNode * pHead; // point to the first element + SSkipListNode * pTail; // point to the last element + void * lastKey; // last key in the skiplist #if SKIP_LIST_RECORD_PERFORMANCE tSkipListState state; // skiplist state #endif - } SSkipList; /* @@ -147,7 +147,7 @@ typedef struct SSkipListIterator { * @return */ SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, - __sl_key_fn_t fn); + uint8_t freeNode, __sl_key_fn_t fn); /** * @@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); * @param keyType * @return */ -SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); /** * get the size of skip list * @param pSkipList * @return */ -size_t tSkipListGetSize(const SSkipList* pSkipList); +size_t tSkipListGetSize(const SSkipList *pSkipList); + +/** + * display skip list of the given level, for debug purpose only + * @param pSkipList + * @param nlevel + */ +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); /** * create skiplist iterator * @param pSkipList * @return */ -SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList); +SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); /** * forward the skip list iterator @@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); * @param iter * @return */ -void* tSkipListDestroyIter(SSkipListIterator* iter); +void *tSkipListDestroyIter(SSkipListIterator *iter); /* * remove only one node of the pKey value. @@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); */ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); - #ifdef __cplusplus } #endif diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index c46f45fd37..1760919b05 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) { } SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, - __sl_key_fn_t fn) { + uint8_t freeNode, __sl_key_fn_t fn) { SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); if (pSkipList == NULL) { return NULL; @@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui maxLevel = MAX_SKIP_LIST_LEVEL; } - pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey}; + pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode}; pSkipList->keyFn = fn; - pSkipList->comparFn = getKeyComparator(keyType); pSkipList->maxLevel = maxLevel; pSkipList->level = 1; @@ -348,12 +347,15 @@ void *tSkipListDestroy(SSkipList *pSkipList) { pthread_rwlock_wrlock(pSkipList->lock); } - SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); // pSkipList->pHead.pForward[0]; + SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); while (pNode) { SSkipListNode *pTemp = pNode; pNode = SL_GET_FORWARD_POINTER(pNode, 0); - tfree(pTemp); + + if (pSkipList->keyInfo.freeNode) { + tfree(pTemp); + } } tfree(pSkipList->pHead); @@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { recordNodeEachLevel(pSkipList, level); #endif + // clear pointer area int32_t level = SL_GET_NODE_LEVEL(pNode); + memset(pNode, 0, SL_NODE_HEADER_SIZE(pNode->level)); + pNode->level = level; + tSkipListDoInsert(pSkipList, forward, level, pNode); atomic_add_fetch_32(&pSkipList->size, 1); @@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { // return NULL; //} // -// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *), -// void *param) { -// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); -// if (NULL == *pRes) { -// pError("error skiplist %p, malloc failed", pSkipList); -// return -1; -// } -// -// pthread_rwlock_rdlock(&pSkipList->lock); -// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; -// int32_t num = 0; -// -// for (int32_t i = 0; i < pSkipList->nSize; ++i) { -// if (pStartNode == NULL) { -// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); -//#ifdef _DEBUG_VIEW -// tSkipListPrint(pSkipList, 1); -//#endif -// break; -// } -// -// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { -// (*pRes)[num++] = pStartNode; -// } -// -// pStartNode = pStartNode->pForward[0]; -// } -// -// pthread_rwlock_unlock(&pSkipList->lock); -// -// if (num == 0) { -// free(*pRes); -// *pRes = NULL; -// } else if (num < pSkipList->nSize) { // free unused memory -// char *tmp = realloc((*pRes), num * POINTER_BYTES); -// assert(tmp != NULL); -// -// *pRes = (SSkipListNode **)tmp; -// } -// -// return num; -//} -// -// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) { -// if (pSkipList == NULL) { -// return -1; -// } -// -// iter->pSkipList = pSkipList; -// if (pSkipList->lock) { -// pthread_rwlock_rdlock(&pSkipList->lock); -// } -// iter->cur = NULL; // pSkipList->pHead.pForward[0]; -// iter->num = pSkipList->size; -// -// if (pSkipList->lock) { -// pthread_rwlock_unlock(&pSkipList->lock); -// } -// -// return 0; -//} -// -// bool tSkipListIteratorNext(SSkipListIterator *iter) { -// if (iter->num == 0 || iter->pSkipList == NULL) { -// return false; -// } -// -// SSkipList *pSkipList = iter->pSkipList; -// -// pthread_rwlock_rdlock(&pSkipList->lock); -// if (iter->cur == NULL) { -// iter->cur = pSkipList->pHead.pForward[0]; -// } else { -// iter->cur = iter->cur->pForward[0]; -// } -// -// pthread_rwlock_unlock(&pSkipList->lock); -// -// return iter->cur != NULL; -//} -// -// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; } -// // int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) { // pSkipList->state.queryCount++; // SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); @@ -841,7 +764,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); - int32_t id = 1; + + int32_t id = 1; while (p) { char *key = SL_GET_NODE_KEY(pSkipList, p); diff --git a/src/util/tests/cacheTest.cpp b/src/util/tests/cacheTest.cpp index 411c899cc0..1902fef4b6 100644 --- a/src/util/tests/cacheTest.cpp +++ b/src/util/tests/cacheTest.cpp @@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) { } uint64_t endTime = taosGetTimestampUs(); - printf("add 10,000,000 object cost:%lld us, avg:%f us\n", endTime - startTime, (endTime-startTime)/(double)num); + printf("add %d object cost:%lld us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num); startTime = taosGetTimestampUs(); for(int32_t i = 0; i < num; ++i) { @@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) { assert(k != 0); } endTime = taosGetTimestampUs(); - printf("retrieve 10,000,000 object cost:%lld us,avg:%f\n", endTime - startTime, (endTime - startTime)/(double)num); + printf("retrieve %d object cost:%lld us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num); taosCacheCleanup(pCache); taosMsleep(20000); diff --git a/src/util/tests/hashTest.cpp b/src/util/tests/hashTest.cpp index 93a1989741..95357886d7 100644 --- a/src/util/tests/hashTest.cpp +++ b/src/util/tests/hashTest.cpp @@ -151,6 +151,6 @@ int main(int argc, char** argv) { TEST(testCase, hashTest) { simpleTest(); stringKeyTest(); - noLockPerformanceTest(); +// noLockPerformanceTest(); multithreadsTest(); } \ No newline at end of file diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp new file mode 100644 index 0000000000..7a3e87d786 --- /dev/null +++ b/src/util/tests/skiplistTest.cpp @@ -0,0 +1,314 @@ +#include +#include +#include +#include + +#include "taosmsg.h" +#include "tskiplist.h" +#include "ttime.h" +#include "tutil.h" + +namespace { + +char* getkey(const void* data) { return (char*)(data); } + +void doubleSkipListTest() { + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, true, getkey); + + double doubleVal[1000] = {0}; + int32_t size = 20000; + + printf("generated %d keys is: \n", size); + + for (int32_t i = 0; i < size; ++i) { + if (i < 1000) { + doubleVal[i] = i * 0.997; + } + + int32_t level = 0; + int32_t size = 0; + + tSkipListRandNodeInfo(pSkipList, &level, &size); + auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2); + d->level = level; + + double* key = (double*)SL_GET_NODE_KEY(pSkipList, d); + key[0] = i * 0.997; + key[1] = i * 0.997; + + tSkipListPut(pSkipList, d); + } + + printf("the first level of skip list is:\n"); + tSkipListPrint(pSkipList, 1); + +#if 0 + SSkipListNode **pNodes = NULL; + SSkipListKey sk; + for (int32_t i = 0; i < 100; ++i) { + sk.nType = TSDB_DATA_TYPE_DOUBLE; + int32_t idx = abs((i * rand()) % 1000); + + sk.dKey = doubleVal[idx]; + + int32_t size = tSkipListGets(pSkipList, &sk, &pNodes); + + printf("the query result size is: %d\n", size); + for (int32_t j = 0; j < size; ++j) { + printf("the result is: %lf\n", pNodes[j]->key.dKey); + } + + if (size > 0) { + tfree(pNodes); + } + } + +#endif + + printf("double test end...\n"); + tSkipListDestroy(pSkipList); +} + +void stringKeySkiplistTest() { + const int32_t max_key_size = 12; + + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, max_key_size, 0, false, true, getkey); + + int32_t level = 0; + int32_t headsize = 0; + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); + pNode->level = level; + + char* d = SL_GET_NODE_DATA(pNode); + strncpy(d, "nyse", 5); + + *(double*)(d + max_key_size) = 12; + + tSkipListPut(pSkipList, pNode); + + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); + pNode->level = level; + + d = SL_GET_NODE_DATA(pNode); + strncpy(d, "beijing", 8); + + *(double*)(d + max_key_size) = 911; + + tSkipListPut(pSkipList, pNode); + +#if 0 + SSkipListNode **pRes = NULL; + int32_t ret = tSkipListGets(pSkipList, &key1, &pRes); + + assert(ret == 1); + assert(strcmp(pRes[0]->key.pz, "beijing") == 0); + assert(pRes[0]->key.nType == TSDB_DATA_TYPE_BINARY); + + tSkipListDestroyKey(&key1); + tSkipListDestroyKey(&key); + + tSkipListDestroy(pSkipList); + + free(pRes); +#endif + + tSkipListDestroy(pSkipList); + + int64_t s = taosGetTimestampUs(); + pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, 20, 0, false, true, getkey); + char k[256] = {0}; + + int32_t total = 10000; + for (int32_t i = 0; i < total; ++i) { + int32_t n = sprintf(k, "abc_%d_%d", i, i); + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double)); + pNode->level = level; + + char* d = SL_GET_NODE_DATA(pNode); + strncpy(d, k, strlen(k)); + + tSkipListPut(pSkipList, pNode); + } + + int64_t e = taosGetTimestampUs(); + printf("elapsed time:%lld us to insert %d data, avg:%f us\n", (e - s), total, (double)(e - s) / total); + +#if 0 + SSkipListNode **pres = NULL; + + s = taosGetTimestampMs(); + for (int32_t j = 0; j < total; ++j) { + int32_t n = sprintf(k, "abc_%d_%d", j, j); + key = tSkipListCreateKey(TSDB_DATA_TYPE_BINARY, k, n); + + int32_t num = tSkipListGets(pSkipList, &key, &pres); + assert(num > 0); + + // tSkipListRemove(pSkipList, &key); + tSkipListRemoveNode(pSkipList, pres[0]); + + if (num > 0) { + tfree(pres); + } + } + + e = taosGetTimestampMs(); + printf("elapsed time:%lldms\n", e - s); +#endif + tSkipListDestroy(pSkipList); +} + +void skiplistPerformanceTest() { + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, false, getkey); + + int32_t size = 900000; + int64_t prev = taosGetTimestampMs(); + int64_t s = prev; + + int32_t level = 0; + int32_t headsize = 0; + + int32_t unit = MAX_SKIP_LIST_LEVEL * POINTER_BYTES * 2 + sizeof(double) * 2 + sizeof(int16_t); + + char* total = (char*)calloc(1, unit * size); + char* p = total; + + for (int32_t i = size; i > 0; --i) { + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + SSkipListNode* d = (SSkipListNode*)p; + p += headsize + sizeof(double) * 2; + + d->level = level; + double* v = (double*)SL_GET_NODE_DATA(d); + v[0] = i * 0.997; + v[1] = i * 0.997; + + tSkipListPut(pSkipList, d); + + if (i % 100000 == 0) { + int64_t cur = taosGetTimestampMs(); + + int64_t elapsed = cur - prev; + printf("add %d, elapsed time: %lld ms, avg elapsed:%f ms, total:%d\n", 100000, elapsed, elapsed / 100000.0, i); + prev = cur; + } + } + + int64_t e = taosGetTimestampMs(); + printf("total:%lld ms, avg:%f\n", e - s, (e - s) / (double)size); + printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); + + assert(tSkipListGetSize(pSkipList) == size); + + printf("the level of skiplist is:\n"); + +// printf("level two------------------\n"); +// tSkipListPrint(pSkipList, 2); +// +// printf("level three------------------\n"); +// tSkipListPrint(pSkipList, 3); +// +// printf("level four------------------\n"); +// tSkipListPrint(pSkipList, 4); +// +// printf("level nine------------------\n"); +// tSkipListPrint(pSkipList, 10); + + int64_t st = taosGetTimestampMs(); +#if 0 + for (int32_t i = 0; i < 100000; i += 1) { + key.dKey = i * 0.997; + tSkipListRemove(pSkipList, &key); + } +#endif + + int64_t et = taosGetTimestampMs(); + printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); + assert(tSkipListGetSize(pSkipList) == size); + + tSkipListDestroy(pSkipList); + tfree(total); +} + +// todo not support duplicated key yet +void duplicatedKeyTest() { +#if 0 + SSkipListKey key; + key.nType = TSDB_DATA_TYPE_INT; + + SSkipListNode **pNodes = NULL; + + SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int)); + + for (int32_t i = 0; i < 10000; ++i) { + for (int32_t j = 0; j < 5; ++j) { + key.i64Key = i; + tSkipListPut(pSkipList, "", &key, 1); + } + } + + tSkipListPrint(pSkipList, 1); + + for (int32_t i = 0; i < 100; ++i) { + key.i64Key = rand() % 1000; + int32_t size = tSkipListGets(pSkipList, &key, &pNodes); + + assert(size == 5); + + tfree(pNodes); + } + + tSkipListDestroy(pSkipList); +#endif +} + +} // namespace + +TEST(testCase, skiplist_test) { + assert(sizeof(SSkipListKey) == 8); + srand(time(NULL)); + + // stringKeySkiplistTest(); + // doubleSkipListTest(); + skiplistPerformanceTest(); + // duplicatedKeyTest(); + + // tSKipListQueryCond q; + // q.upperBndRelOptr = true; + // q.lowerBndRelOptr = true; + // q.upperBnd.nType = TSDB_DATA_TYPE_DOUBLE; + // q.lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE; + // q.lowerBnd.dKey = 120; + // q.upperBnd.dKey = 171.989; + /* + int32_t size = tSkipListQuery(pSkipList, &q, &pNodes); + for (int32_t i = 0; i < size; ++i) { + printf("-----%lf\n", pNodes[i]->key.dKey); + } + printf("the range query result size is: %d\n", size); + tfree(pNodes); + + SSkipListKey *pKeys = malloc(sizeof(SSkipListKey) * 20); + for (int32_t i = 0; i < 8; i += 2) { + pKeys[i].dKey = i * 0.997; + pKeys[i].nType = TSDB_DATA_TYPE_DOUBLE; + printf("%lf ", pKeys[i].dKey); + } + + int32_t r = tSkipListPointQuery(pSkipList, pKeys, 8, EXCLUDE_POINT_QUERY, &pNodes); + printf("\nthe exclude query result is: %d\n", r); + for (int32_t i = 0; i < r; ++i) { + // printf("%lf ", pNodes[i]->key.dKey); + } + tfree(pNodes); + + free(pKeys);*/ + getchar(); +} diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index b1a531455d..bf870a0d05 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -719,7 +719,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (pTable->mem == NULL) { pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); if (pTable->mem == NULL) return -1; - pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey); pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyLast = 0; } @@ -742,7 +742,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (pTable->mem == NULL) { pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); if (pTable->mem == NULL) return -1; - pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey); pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyLast = 0; } @@ -1162,7 +1162,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { return 0; } -static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { +int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { STsdbCfg * pCfg = &(pRepo->config); SCompData *pCompData = NULL; SFile * pFile = NULL; diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 1dd1c3b29d..22680a839b 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { if (pTable->type == TSDB_SUPER_TABLE) { pTable->pIndex = - tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); + tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, 0, getTupleKey); } tsdbAddTableToMeta(pMeta, pTable, false); @@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagVal = tdDataRowDup(pCfg->tagValues); super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, - 0, getTupleKey); // Allow duplicate key, no lock + 0, 0, getTupleKey); // Allow duplicate key, no lock if (super->pIndex == NULL) { tdFreeSchema(super->schema); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 3ed6e22ad8..2919b2cf9e 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -362,7 +362,7 @@ static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGrou SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file - + } else { tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pBlock); } @@ -425,7 +425,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { // temporarily keep the position value, in case of no data qualified when move forwards(backwards) SQueryFilePos save = pQueryHandle->cur; -// fileIndex = getNextDataFileCompInfo_(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step); +// fileIndex = getNextDataFileCompInfo(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step); // first data block in the next file if (fileIndex >= 0) { @@ -441,7 +441,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { // if (nextTimestamp < 0) { // pQueryHandle->cur = save; // } - +// // return (nextTimestamp > 0); } @@ -460,11 +460,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; // return loadQaulifiedData(pQueryHandle); } else { // data in cache -// todo continue; + return hasMoreDataInCacheForSingleModel(pQueryHandle); } } - int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; int numOfPoints; @@ -579,8 +578,6 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) { } } - - assert(pQueryHandle->realNumOfRows <= blockInfo.size); // forward(backward) the position for cursor @@ -592,7 +589,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); - pCheckInfo->checkFirstFileBlock = true; + if (fileGroup == NULL) { + return false; + } SQueryFilePos* cur = &pQueryHandle->cur; @@ -636,7 +635,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf // todo no need to loaded at all cur->slot = index; - + // sa = getDefaultLoadColumns(pQueryHandle, true); if (tsdbLoadDataBlock(&fileGroup->files[2], &pCheckInfo->pBlock[cur->slot], 1, fid, sa) == 0) { blockLoaded = true; @@ -660,34 +659,28 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); + STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); SQueryFilePos* cur = &pHandle->cur; STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); - if (!pCheckInfo->checkFirstFileBlock && pFileHandle != NULL) { - int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); - SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); + if (!pCheckInfo->checkFirstFileBlock) { pCheckInfo->checkFirstFileBlock = true; - if (fileGroup != NULL) { - return getQualifiedDataBlock(pHandle, pCheckInfo, 1); - } else { // no data in file, try cache - return hasMoreDataInCacheForSingleModel(pHandle); - } - } else { - pCheckInfo->checkFirstFileBlock = true; - if (pFileHandle == NULL) { - cur->fid = -1; + if (pFileHandle != NULL) { + bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1); + if (found) { + return true; + } } - if (cur->fid == -1 || pFileHandle != NULL) { // try data in cache - return hasMoreDataInCacheForSingleModel(pHandle); - } else { - return true; - } + // no data in file, try cache + pHandle->cur.fid = -1; + return hasMoreDataInCacheForSingleModel(pHandle); + } else { // move to next data block in file or in cache + return moveToNextBlock(pHandle, 1); } - } static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { -- GitLab