diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 0ec1d0eab52298913029f455e1566d9cf5d6630b..18404f89c27d2c122e813edae674bb3869c451b7 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -51,7 +51,7 @@ typedef struct SSkipListNode { #define SL_GET_BACKWARD_POINTER(n, _l) \ ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)] -#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) +#define SL_GET_NODE_DATA(n) ((char *)(n) + SL_NODE_HEADER_SIZE((n)->level)) #define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) @@ -106,25 +106,25 @@ typedef struct tSkipListState { typedef struct SSkipListKeyInfo { uint8_t dupKey : 2; // if allow duplicated key in the skip list - uint8_t type : 6; // key type + uint8_t type : 4; // key type + uint8_t freeNode:2; // free node when destroy the skiplist uint8_t len; // maximum key length, used in case of string key } SSkipListKeyInfo; typedef struct SSkipList { - __compar_fn_t comparFn; - __sl_key_fn_t keyFn; - uint32_t size; - uint8_t maxLevel; - uint8_t level; - SSkipListKeyInfo keyInfo; - + __compar_fn_t comparFn; + __sl_key_fn_t keyFn; + uint32_t size; + uint8_t maxLevel; + uint8_t level; + SSkipListKeyInfo keyInfo; pthread_rwlock_t *lock; - SSkipListNode * pHead; - + SSkipListNode * pHead; // point to the first element + SSkipListNode * pTail; // point to the last element + void * lastKey; // last key in the skiplist #if SKIP_LIST_RECORD_PERFORMANCE tSkipListState state; // skiplist state #endif - } SSkipList; /* @@ -147,7 +147,7 @@ typedef struct SSkipListIterator { * @return */ SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe, - __sl_key_fn_t fn); + uint8_t freeNode, __sl_key_fn_t fn); /** * @@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode); * @param keyType * @return */ -SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); +SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType); /** * get the size of skip list * @param pSkipList * @return */ -size_t tSkipListGetSize(const SSkipList* pSkipList); +size_t tSkipListGetSize(const SSkipList *pSkipList); + +/** + * display skip list of the given level, for debug purpose only + * @param pSkipList + * @param nlevel + */ +void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); /** * create skiplist iterator * @param pSkipList * @return */ -SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList); +SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); /** * forward the skip list iterator @@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter); * @param iter * @return */ -void* tSkipListDestroyIter(SSkipListIterator* iter); +void *tSkipListDestroyIter(SSkipListIterator *iter); /* * remove only one node of the pKey value. @@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey); */ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); - #ifdef __cplusplus } #endif diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index c46f45fd37178147c7bb2903462290f861ac6189..1760919b053ffcd1ea9703b1b7a51bd023d49547 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) { } SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock, - __sl_key_fn_t fn) { + uint8_t freeNode, __sl_key_fn_t fn) { SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList)); if (pSkipList == NULL) { return NULL; @@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui maxLevel = MAX_SKIP_LIST_LEVEL; } - pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey}; + pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey, .freeNode = freeNode}; pSkipList->keyFn = fn; - pSkipList->comparFn = getKeyComparator(keyType); pSkipList->maxLevel = maxLevel; pSkipList->level = 1; @@ -348,12 +347,15 @@ void *tSkipListDestroy(SSkipList *pSkipList) { pthread_rwlock_wrlock(pSkipList->lock); } - SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); // pSkipList->pHead.pForward[0]; + SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); while (pNode) { SSkipListNode *pTemp = pNode; pNode = SL_GET_FORWARD_POINTER(pNode, 0); - tfree(pTemp); + + if (pSkipList->keyInfo.freeNode) { + tfree(pTemp); + } } tfree(pSkipList->pHead); @@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { recordNodeEachLevel(pSkipList, level); #endif + // clear pointer area int32_t level = SL_GET_NODE_LEVEL(pNode); + memset(pNode, 0, SL_NODE_HEADER_SIZE(pNode->level)); + pNode->level = level; + tSkipListDoInsert(pSkipList, forward, level, pNode); atomic_add_fetch_32(&pSkipList->size, 1); @@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) { // return NULL; //} // -// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *), -// void *param) { -// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); -// if (NULL == *pRes) { -// pError("error skiplist %p, malloc failed", pSkipList); -// return -1; -// } -// -// pthread_rwlock_rdlock(&pSkipList->lock); -// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; -// int32_t num = 0; -// -// for (int32_t i = 0; i < pSkipList->nSize; ++i) { -// if (pStartNode == NULL) { -// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); -//#ifdef _DEBUG_VIEW -// tSkipListPrint(pSkipList, 1); -//#endif -// break; -// } -// -// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { -// (*pRes)[num++] = pStartNode; -// } -// -// pStartNode = pStartNode->pForward[0]; -// } -// -// pthread_rwlock_unlock(&pSkipList->lock); -// -// if (num == 0) { -// free(*pRes); -// *pRes = NULL; -// } else if (num < pSkipList->nSize) { // free unused memory -// char *tmp = realloc((*pRes), num * POINTER_BYTES); -// assert(tmp != NULL); -// -// *pRes = (SSkipListNode **)tmp; -// } -// -// return num; -//} -// -// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) { -// if (pSkipList == NULL) { -// return -1; -// } -// -// iter->pSkipList = pSkipList; -// if (pSkipList->lock) { -// pthread_rwlock_rdlock(&pSkipList->lock); -// } -// iter->cur = NULL; // pSkipList->pHead.pForward[0]; -// iter->num = pSkipList->size; -// -// if (pSkipList->lock) { -// pthread_rwlock_unlock(&pSkipList->lock); -// } -// -// return 0; -//} -// -// bool tSkipListIteratorNext(SSkipListIterator *iter) { -// if (iter->num == 0 || iter->pSkipList == NULL) { -// return false; -// } -// -// SSkipList *pSkipList = iter->pSkipList; -// -// pthread_rwlock_rdlock(&pSkipList->lock); -// if (iter->cur == NULL) { -// iter->cur = pSkipList->pHead.pForward[0]; -// } else { -// iter->cur = iter->cur->pForward[0]; -// } -// -// pthread_rwlock_unlock(&pSkipList->lock); -// -// return iter->cur != NULL; -//} -// -// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; } -// // int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) { // pSkipList->state.queryCount++; // SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); @@ -841,7 +764,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); - int32_t id = 1; + + int32_t id = 1; while (p) { char *key = SL_GET_NODE_KEY(pSkipList, p); diff --git a/src/util/tests/cacheTest.cpp b/src/util/tests/cacheTest.cpp index 411c899cc010e3f3ca3c39e1d0ff2c06c1c5e873..1902fef4b64d0b796a85d9f58e1f422fe0df59e8 100644 --- a/src/util/tests/cacheTest.cpp +++ b/src/util/tests/cacheTest.cpp @@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) { } uint64_t endTime = taosGetTimestampUs(); - printf("add 10,000,000 object cost:%lld us, avg:%f us\n", endTime - startTime, (endTime-startTime)/(double)num); + printf("add %d object cost:%lld us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num); startTime = taosGetTimestampUs(); for(int32_t i = 0; i < num; ++i) { @@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) { assert(k != 0); } endTime = taosGetTimestampUs(); - printf("retrieve 10,000,000 object cost:%lld us,avg:%f\n", endTime - startTime, (endTime - startTime)/(double)num); + printf("retrieve %d object cost:%lld us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num); taosCacheCleanup(pCache); taosMsleep(20000); diff --git a/src/util/tests/skiplistTest.cpp b/src/util/tests/skiplistTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7a3e87d786ca1e2647d90ca6b86922743edef9a9 --- /dev/null +++ b/src/util/tests/skiplistTest.cpp @@ -0,0 +1,314 @@ +#include +#include +#include +#include + +#include "taosmsg.h" +#include "tskiplist.h" +#include "ttime.h" +#include "tutil.h" + +namespace { + +char* getkey(const void* data) { return (char*)(data); } + +void doubleSkipListTest() { + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, true, getkey); + + double doubleVal[1000] = {0}; + int32_t size = 20000; + + printf("generated %d keys is: \n", size); + + for (int32_t i = 0; i < size; ++i) { + if (i < 1000) { + doubleVal[i] = i * 0.997; + } + + int32_t level = 0; + int32_t size = 0; + + tSkipListRandNodeInfo(pSkipList, &level, &size); + auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2); + d->level = level; + + double* key = (double*)SL_GET_NODE_KEY(pSkipList, d); + key[0] = i * 0.997; + key[1] = i * 0.997; + + tSkipListPut(pSkipList, d); + } + + printf("the first level of skip list is:\n"); + tSkipListPrint(pSkipList, 1); + +#if 0 + SSkipListNode **pNodes = NULL; + SSkipListKey sk; + for (int32_t i = 0; i < 100; ++i) { + sk.nType = TSDB_DATA_TYPE_DOUBLE; + int32_t idx = abs((i * rand()) % 1000); + + sk.dKey = doubleVal[idx]; + + int32_t size = tSkipListGets(pSkipList, &sk, &pNodes); + + printf("the query result size is: %d\n", size); + for (int32_t j = 0; j < size; ++j) { + printf("the result is: %lf\n", pNodes[j]->key.dKey); + } + + if (size > 0) { + tfree(pNodes); + } + } + +#endif + + printf("double test end...\n"); + tSkipListDestroy(pSkipList); +} + +void stringKeySkiplistTest() { + const int32_t max_key_size = 12; + + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, max_key_size, 0, false, true, getkey); + + int32_t level = 0; + int32_t headsize = 0; + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); + pNode->level = level; + + char* d = SL_GET_NODE_DATA(pNode); + strncpy(d, "nyse", 5); + + *(double*)(d + max_key_size) = 12; + + tSkipListPut(pSkipList, pNode); + + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); + pNode->level = level; + + d = SL_GET_NODE_DATA(pNode); + strncpy(d, "beijing", 8); + + *(double*)(d + max_key_size) = 911; + + tSkipListPut(pSkipList, pNode); + +#if 0 + SSkipListNode **pRes = NULL; + int32_t ret = tSkipListGets(pSkipList, &key1, &pRes); + + assert(ret == 1); + assert(strcmp(pRes[0]->key.pz, "beijing") == 0); + assert(pRes[0]->key.nType == TSDB_DATA_TYPE_BINARY); + + tSkipListDestroyKey(&key1); + tSkipListDestroyKey(&key); + + tSkipListDestroy(pSkipList); + + free(pRes); +#endif + + tSkipListDestroy(pSkipList); + + int64_t s = taosGetTimestampUs(); + pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_BINARY, 20, 0, false, true, getkey); + char k[256] = {0}; + + int32_t total = 10000; + for (int32_t i = 0; i < total; ++i) { + int32_t n = sprintf(k, "abc_%d_%d", i, i); + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double)); + pNode->level = level; + + char* d = SL_GET_NODE_DATA(pNode); + strncpy(d, k, strlen(k)); + + tSkipListPut(pSkipList, pNode); + } + + int64_t e = taosGetTimestampUs(); + printf("elapsed time:%lld us to insert %d data, avg:%f us\n", (e - s), total, (double)(e - s) / total); + +#if 0 + SSkipListNode **pres = NULL; + + s = taosGetTimestampMs(); + for (int32_t j = 0; j < total; ++j) { + int32_t n = sprintf(k, "abc_%d_%d", j, j); + key = tSkipListCreateKey(TSDB_DATA_TYPE_BINARY, k, n); + + int32_t num = tSkipListGets(pSkipList, &key, &pres); + assert(num > 0); + + // tSkipListRemove(pSkipList, &key); + tSkipListRemoveNode(pSkipList, pres[0]); + + if (num > 0) { + tfree(pres); + } + } + + e = taosGetTimestampMs(); + printf("elapsed time:%lldms\n", e - s); +#endif + tSkipListDestroy(pSkipList); +} + +void skiplistPerformanceTest() { + SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, false, getkey); + + int32_t size = 900000; + int64_t prev = taosGetTimestampMs(); + int64_t s = prev; + + int32_t level = 0; + int32_t headsize = 0; + + int32_t unit = MAX_SKIP_LIST_LEVEL * POINTER_BYTES * 2 + sizeof(double) * 2 + sizeof(int16_t); + + char* total = (char*)calloc(1, unit * size); + char* p = total; + + for (int32_t i = size; i > 0; --i) { + tSkipListRandNodeInfo(pSkipList, &level, &headsize); + + SSkipListNode* d = (SSkipListNode*)p; + p += headsize + sizeof(double) * 2; + + d->level = level; + double* v = (double*)SL_GET_NODE_DATA(d); + v[0] = i * 0.997; + v[1] = i * 0.997; + + tSkipListPut(pSkipList, d); + + if (i % 100000 == 0) { + int64_t cur = taosGetTimestampMs(); + + int64_t elapsed = cur - prev; + printf("add %d, elapsed time: %lld ms, avg elapsed:%f ms, total:%d\n", 100000, elapsed, elapsed / 100000.0, i); + prev = cur; + } + } + + int64_t e = taosGetTimestampMs(); + printf("total:%lld ms, avg:%f\n", e - s, (e - s) / (double)size); + printf("max level of skiplist:%d, actually level:%d\n ", pSkipList->maxLevel, pSkipList->level); + + assert(tSkipListGetSize(pSkipList) == size); + + printf("the level of skiplist is:\n"); + +// printf("level two------------------\n"); +// tSkipListPrint(pSkipList, 2); +// +// printf("level three------------------\n"); +// tSkipListPrint(pSkipList, 3); +// +// printf("level four------------------\n"); +// tSkipListPrint(pSkipList, 4); +// +// printf("level nine------------------\n"); +// tSkipListPrint(pSkipList, 10); + + int64_t st = taosGetTimestampMs(); +#if 0 + for (int32_t i = 0; i < 100000; i += 1) { + key.dKey = i * 0.997; + tSkipListRemove(pSkipList, &key); + } +#endif + + int64_t et = taosGetTimestampMs(); + printf("delete %d data from skiplist, elapased time:%" PRIu64 "ms\n", 10000, et - st); + assert(tSkipListGetSize(pSkipList) == size); + + tSkipListDestroy(pSkipList); + tfree(total); +} + +// todo not support duplicated key yet +void duplicatedKeyTest() { +#if 0 + SSkipListKey key; + key.nType = TSDB_DATA_TYPE_INT; + + SSkipListNode **pNodes = NULL; + + SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int)); + + for (int32_t i = 0; i < 10000; ++i) { + for (int32_t j = 0; j < 5; ++j) { + key.i64Key = i; + tSkipListPut(pSkipList, "", &key, 1); + } + } + + tSkipListPrint(pSkipList, 1); + + for (int32_t i = 0; i < 100; ++i) { + key.i64Key = rand() % 1000; + int32_t size = tSkipListGets(pSkipList, &key, &pNodes); + + assert(size == 5); + + tfree(pNodes); + } + + tSkipListDestroy(pSkipList); +#endif +} + +} // namespace + +TEST(testCase, skiplist_test) { + assert(sizeof(SSkipListKey) == 8); + srand(time(NULL)); + + // stringKeySkiplistTest(); + // doubleSkipListTest(); + skiplistPerformanceTest(); + // duplicatedKeyTest(); + + // tSKipListQueryCond q; + // q.upperBndRelOptr = true; + // q.lowerBndRelOptr = true; + // q.upperBnd.nType = TSDB_DATA_TYPE_DOUBLE; + // q.lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE; + // q.lowerBnd.dKey = 120; + // q.upperBnd.dKey = 171.989; + /* + int32_t size = tSkipListQuery(pSkipList, &q, &pNodes); + for (int32_t i = 0; i < size; ++i) { + printf("-----%lf\n", pNodes[i]->key.dKey); + } + printf("the range query result size is: %d\n", size); + tfree(pNodes); + + SSkipListKey *pKeys = malloc(sizeof(SSkipListKey) * 20); + for (int32_t i = 0; i < 8; i += 2) { + pKeys[i].dKey = i * 0.997; + pKeys[i].nType = TSDB_DATA_TYPE_DOUBLE; + printf("%lf ", pKeys[i].dKey); + } + + int32_t r = tSkipListPointQuery(pSkipList, pKeys, 8, EXCLUDE_POINT_QUERY, &pNodes); + printf("\nthe exclude query result is: %d\n", r); + for (int32_t i = 0; i < r; ++i) { + // printf("%lf ", pNodes[i]->key.dKey); + } + tfree(pNodes); + + free(pKeys);*/ + getchar(); +} diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 8c106d1067f92d9eabcb3dbf8eb89081a93f015a..0c85c5ef46548df5fd6d1d5866c2ddff411040a7 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -20,6 +20,7 @@ #include "dataformat.h" #include "taosdef.h" #include "tglobalcfg.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -148,6 +149,8 @@ typedef struct { SCompCol cols[]; } SCompData; +STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo); + int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 97c98efa05e0e742cb9ec1ea85d69ba3688d7f71..6d675d300a418b521fd5595396c25f3dfac88d55 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -583,6 +583,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) { return tsdb->tsdbMeta; } +STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo) { + STsdbRepo* tsdb = (STsdbRepo*) pRepo; + return tsdb->tsdbFileH; +} + // Check the configuration and set default options static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // Check precision @@ -746,7 +751,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (pTable->mem == NULL) { pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); if (pTable->mem == NULL) return -1; - pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey); pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyLast = 0; } @@ -769,7 +774,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable if (pTable->mem == NULL) { pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); if (pTable->mem == NULL) return -1; - pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, getTupleKey); pTable->mem->keyFirst = INT64_MAX; pTable->mem->keyLast = 0; } @@ -1189,7 +1194,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { return 0; } -static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { +int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { STsdbCfg * pCfg = &(pRepo->config); SCompData *pCompData = NULL; SFile * pFile = NULL; diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 1dd1c3b29d1dd1d805169e85c39c04d235ad1016..22680a839b28db0d26723df94eedcb58a1758abe 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { if (pTable->type == TSDB_SUPER_TABLE) { pTable->pIndex = - tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); + tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, 0, getTupleKey); } tsdbAddTableToMeta(pMeta, pTable, false); @@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagVal = tdDataRowDup(pCfg->tagValues); super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, - 0, getTupleKey); // Allow duplicate key, no lock + 0, 0, getTupleKey); // Allow duplicate key, no lock if (super->pIndex == NULL) { tdFreeSchema(super->schema); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 7b996c8f7a170fdf278c1cb844ca85686f65fa6a..2919b2cf9e9229ecc7bf558cb484833d5dc2cb28 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -13,8 +13,9 @@ * along with this program. If not, see . */ -#include #include "os.h" + +#include "tlog.h" #include "tutil.h" #include "../../../query/inc/qast.h" @@ -28,6 +29,11 @@ #define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC) #define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) +enum { + QUERY_RANGE_LESS_EQUAL = 0, + QUERY_RANGE_GREATER_EQUAL = 1, +}; + typedef struct SField { // todo need the definition } SField; @@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo { int32_t fileId; } SHeaderFileInfo; -typedef struct SQueryHandlePos { - int32_t fileId; +typedef struct SQueryFilePos { + int32_t fid; int32_t slot; int32_t pos; - int32_t fileIndex; -} SQueryHandlePos; + int64_t lastKey; +} SQueryFilePos; typedef struct SDataBlockLoadInfo { int32_t fileListIndex; @@ -78,8 +84,12 @@ typedef struct STableCheckInfo { TSKEY lastKey; STable * pTableObj; int64_t offsetInHeaderFile; - int32_t numOfBlocks; +// int32_t numOfBlocks; int32_t start; + bool checkFirstFileBlock; + + SCompIdx* compIndex; + SCompBlock *pBlock; SSkipListIterator* iter; } STableCheckInfo; @@ -104,8 +114,8 @@ enum { typedef struct STsdbQueryHandle { struct STsdbRepo* pTsdb; int8_t model; // access model, single table model or multi-table model - SQueryHandlePos cur; // current position - SQueryHandlePos start; // the start position, used for secondary/third iteration + SQueryFilePos cur; // current position + SQueryFilePos start; // the start position, used for secondary/third iteration int32_t unzipBufSize; char *unzipBuffer; char *secondaryUnzipBuffer; @@ -342,6 +352,337 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) { return true; } +// todo dynamic get the daysperfile +static int32_t getFileIdFromKey(TSKEY key) { + return (int32_t)(key / 10); // set the starting fileId +} + +static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGroup) { + tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables + SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid]; + + if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file + + } else { + tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pBlock); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { + int32_t firstSlot = 0; + int32_t lastSlot = numOfBlocks - 1; + + int32_t midSlot = firstSlot; + + while (1) { + numOfBlocks = lastSlot - firstSlot + 1; + midSlot = (firstSlot + (numOfBlocks >> 1)); + + if (numOfBlocks == 1) break; + + if (skey > pBlock[midSlot].keyLast) { + if (numOfBlocks == 2) break; + if ((order == TSQL_SO_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break; + firstSlot = midSlot + 1; + } else if (skey < pBlock[midSlot].keyFirst) { + if ((order == TSQL_SO_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break; + lastSlot = midSlot - 1; + } else { + break; // got the slot + } + } + + return midSlot; +} + +static SDataBlockInfo getTrueBlockInfo(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { + SDataBlockInfo info = {{0}, 0}; + + SCompBlock *pDiskBlock = &pCheckInfo->pBlock[pHandle->cur.slot]; + + info.window.skey = pDiskBlock->keyFirst; + info.window.ekey = pDiskBlock->keyLast; + info.size = pDiskBlock->numOfPoints; + info.numOfCols = pDiskBlock->numOfCols; + + return info; +} + +bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { + SQueryFilePos *cur = &pQueryHandle->cur; + + if (pQueryHandle->cur.fid >= 0) { + int32_t fileIndex = -1; + + /* + * 1. ascending order. The last data block of data file + * 2. descending order. The first block of file + */ + if ((step == QUERY_ASC_FORWARD_STEP && (pQueryHandle->cur.slot == pQueryHandle->numOfBlocks - 1)) || + (step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) { + // temporarily keep the position value, in case of no data qualified when move forwards(backwards) + SQueryFilePos save = pQueryHandle->cur; + +// fileIndex = getNextDataFileCompInfo(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step); + + // first data block in the next file + if (fileIndex >= 0) { + cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1; + cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1; +// return loadQaulifiedData(pQueryHandle); + } else {// try data in cache + assert(cur->fid == -1); + + if (step == QUERY_ASC_FORWARD_STEP) { +// TSKEY nextTimestamp = +// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true); +// if (nextTimestamp < 0) { +// pQueryHandle->cur = save; +// } +// +// return (nextTimestamp > 0); + } + + // no data to check for desc order query, restore the saved position value + pQueryHandle->cur = save; + return false; + } + } + + // next block in the same file + int32_t fid = cur->fid; +// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo); + cur->slot += step; + + SCompBlock *pBlock = &pQueryHandle->pBlock[cur->slot]; + cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; +// return loadQaulifiedData(pQueryHandle); + } else { // data in cache + return hasMoreDataInCacheForSingleModel(pQueryHandle); + } +} + +int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) { + int firstPos, lastPos, midPos = -1; + int numOfPoints; + TSKEY *keyList; + + if (num <= 0) return -1; + + keyList = (TSKEY *)pValue; + firstPos = 0; + lastPos = num - 1; + + if (order == 0) { + // find the first position which is smaller than the key + while (1) { + if (key >= keyList[lastPos]) return lastPos; + if (key == keyList[firstPos]) return firstPos; + if (key < keyList[firstPos]) return firstPos - 1; + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + + } else { + // find the first position which is bigger than the key + while (1) { + if (key <= keyList[firstPos]) return firstPos; + if (key == keyList[lastPos]) return lastPos; + + if (key > keyList[lastPos]) { + lastPos = lastPos + 1; + if (lastPos >= num) + return -1; + else + return lastPos; + } + + numOfPoints = lastPos - firstPos + 1; + midPos = (numOfPoints >> 1) + firstPos; + + if (key < keyList[midPos]) { + lastPos = midPos - 1; + } else if (key > keyList[midPos]) { + firstPos = midPos + 1; + } else { + break; + } + } + } + + return midPos; +} + +static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) { + // only return the qualified data to client in terms of query time window, data rows in the same block but do not + // be included in the query time window will be discarded + SQueryFilePos *cur = &pQueryHandle->cur; + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + SDataBlockInfo blockInfo = getTrueBlockInfo(pQueryHandle, pCheckInfo); + + int32_t endPos = cur->pos; + if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { + endPos = blockInfo.size - 1; + pQueryHandle->realNumOfRows = endPos - cur->pos + 1; + } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { + endPos = 0; + pQueryHandle->realNumOfRows = cur->pos + 1; + } else { +// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size, pQueryHandle->window.ekey, pQueryHandle->order); + + if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + if (endPos < cur->pos) { + pQueryHandle->realNumOfRows = 0; + return; + } else { + pQueryHandle->realNumOfRows = endPos - cur->pos; + } + } else { + if (endPos > cur->pos) { + pQueryHandle->realNumOfRows = 0; + return; + } else { + pQueryHandle->realNumOfRows = cur->pos - endPos; + } + } + } + + int32_t start = MIN(cur->pos, endPos); + + // move the data block in the front to data block if needed + if (start != 0) { + int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); + + for (int32_t i = 0; i < taosArrayGetSize(sa); ++i) { + int16_t colId = *(int16_t *)taosArrayGet(sa, i); + + for (int32_t j = 0; j < numOfCols; ++j) { + SColumnInfoEx *pCol = taosArrayGet(pQueryHandle->pColumns, j); + + if (pCol->info.colId == colId) { + memmove(pCol->pData, ((char *)pCol->pData) + pCol->info.bytes * start, pQueryHandle->realNumOfRows * pCol->info.bytes); + break; + } + } + } + } + + assert(pQueryHandle->realNumOfRows <= blockInfo.size); + + // forward(backward) the position for cursor + cur->pos = endPos; +} + +static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) { + STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); + int32_t fid = getFileIdFromKey(pCheckInfo->lastKey); + + SFileGroup* fileGroup = tsdbSearchFGroup(pFileHandle, fid); + if (fileGroup == NULL) { + return false; + } + + SQueryFilePos* cur = &pQueryHandle->cur; + + TSKEY key = pCheckInfo->lastKey; + int32_t index = -1; + + // todo add iterator for filegroup + while (1) { + if ((fid = getFileCompInfo(pCheckInfo, fileGroup)) < 0) { + break; + } + + int32_t tid = pCheckInfo->tableId.tid; + index = binarySearchForBlockImpl(pCheckInfo->pBlock, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key); + + if (type == QUERY_RANGE_GREATER_EQUAL) { + if (key <= pCheckInfo->pBlock[index].keyLast) { + break; + } else { + index = -1; + } + } else { + if (key >= pCheckInfo->pBlock[index].keyFirst) { + break; + } else { + index = -1; + } + } + } + + // failed to find qualified point in file, abort + if (index == -1) { + return false; + } + + assert(index >= 0 && index < pQueryHandle->numOfBlocks); + + // load first data block into memory failed, caused by disk block error + bool blockLoaded = false; + SArray *sa = NULL; + + // todo no need to loaded at all + cur->slot = index; + +// sa = getDefaultLoadColumns(pQueryHandle, true); + if (tsdbLoadDataBlock(&fileGroup->files[2], &pCheckInfo->pBlock[cur->slot], 1, fid, sa) == 0) { + blockLoaded = true; + } + + // dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files", + // GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx); + + // failed to load data from disk, abort current query + if (blockLoaded == false) { + return false; + } + + // todo search qualified points in blk, according to primary key (timestamp) column +// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order); + assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); + + filterDataInDataBlock(pQueryHandle, sa); + return pQueryHandle->realNumOfRows > 0; +} + +static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { + assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); + + STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); + SQueryFilePos* cur = &pHandle->cur; + + STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + + if (!pCheckInfo->checkFirstFileBlock) { + pCheckInfo->checkFirstFileBlock = true; + + if (pFileHandle != NULL) { + bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1); + if (found) { + return true; + } + } + + // no data in file, try cache + pHandle->cur.fid = -1; + return hasMoreDataInCacheForSingleModel(pHandle); + } else { // move to next data block in file or in cache + return moveToNextBlock(pHandle, 1); + } +} + static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo); assert(numOfTables > 0); @@ -372,7 +713,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) { bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; if (pHandle->model == SINGLE_TABLE_MODEL) { - return hasMoreDataInCacheForSingleModel(pHandle); + return hasMoreDataInFileForSingleTableModel(pHandle); } else { return hasMoreDataInCacheForMultiModel(pHandle); } @@ -704,8 +1045,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); - char buf[TSDB_MAX_TAGS_LEN] = {0}; - char* val = dataRowTuple(pTable->tagVal); // todo not only the first column int8_t type = pInfo->sch.type; @@ -765,9 +1104,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond // query according to the binary expression SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)}; - SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, + SBinaryFilterSupp supp = { + .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare, - .pExtInfo = &s}; + .pExtInfo = &s + }; tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp); tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo); diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 6fcedb8123cb2ea8e6bb6b89a3b7d45c96bcd3f6..b621781a3c81e4155841ac5d4ec31aae6a38518e 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -46,7 +46,8 @@ int main(int argc, char *argv[]) { } printf("success to connect to server\n"); - int32_t code = taos_query(taos, "select * from test.t1"); +// int32_t code = taos_query(taos, "insert into test.tm2 values(now, 1)(now+1m,2)(now+2m,3) (now+3m, 4) (now+4m, 5);"); + int32_t code = taos_query(taos, "insert into test.tm2 values(now, 99)"); if (code != 0) { printf("failed to execute query, reason:%s\n", taos_errstr(taos)); }