diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index 22100bc1d17c71af57b1230e2421e135b0c9eec6..17e7667445da7eb67dbd1153faf335012411aa56 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -21,7 +21,7 @@ #include "tscSyntaxtreefunction.h" #include "tschemautil.h" #include "taosdef.h" -#include "tskiplist.h" +#include "sskiplist.h" #include "tsqldef.h" #include "tsqlfunction.h" #include "tstoken.h" diff --git a/src/util/inc/sskiplist.h b/src/util/inc/sskiplist.h new file mode 100644 index 0000000000000000000000000000000000000000..28300a69b4cc1688b205e1c3056a85ef21849d63 --- /dev/null +++ b/src/util/inc/sskiplist.h @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TBASE_TSKIPLIST_H +#define TBASE_TSKIPLIST_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define MAX_SKIP_LIST_LEVEL 20 + +#include +#include +#include + +#include "os.h" +#include "ttypes.h" + +/* + * key of each node + * todo move to as the global structure in all search codes... + */ + +const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15; +typedef tVariant tSkipListKey; + +typedef enum tSkipListPointQueryType { + INCLUDE_POINT_QUERY, + EXCLUDE_POINT_QUERY, +} tSkipListPointQueryType; + +typedef struct tSkipListNode { + uint16_t nLevel; + char * pData; + + struct tSkipListNode **pForward; + struct tSkipListNode **pBackward; + + tSkipListKey key; +} tSkipListNode; + +/* + * @version 0.2 + * @date 2017/11/12 + * the simple version of SkipList. + * for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate + * deterministic result. Later, we will remove the lock in SkipList to further + * enhance the performance. In this case, one should use the concurrent skip list (by + * using michael-scott algorithm) instead of this simple version in a multi-thread + * environment, to achieve higher performance of read/write operations. + * + * Note: Duplicated primary key situation. + * In case of duplicated primary key, two ways can be employed to handle this situation: + * 1. add as normal insertion with out special process. + * 2. add an overflow pointer at each list node, all nodes with the same key will be added + * in the overflow pointer. In this case, the total steps of each search will be reduced significantly. + * Currently, we implement the skip list in a line with the first means, maybe refactor it soon. + * + * Memory consumption: the memory alignment causes many memory wasted. So, employ a memory + * pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs. + * + * 3. use the iterator pattern to refactor all routines to make it more clean + */ + +// state struct, record following information: +// number of links in each level. +// avg search steps, for latest 1000 queries +// avg search rsp time, for latest 1000 queries +// total memory size +typedef struct tSkipListState { + // in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize + uint64_t nTotalMemSize; + uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL]; + uint64_t queryCount; // total query count + + /* + * only record latest 1000 queries + * when the value==1000, = 0, + * nTotalStepsForQueries = 0, + * nTotalElapsedTimeForQueries = 0 + */ + uint64_t nRecQueries; + uint16_t nTotalStepsForQueries; + uint64_t nTotalElapsedTimeForQueries; + + uint16_t nInsertObjs; + uint16_t nTotalStepsForInsert; + uint64_t nTotalElapsedTimeForInsert; +} tSkipListState; + +typedef struct tSkipList { + tSkipListNode pHead; + uint64_t nSize; + uint16_t nMaxLevel; + uint16_t nLevel; + uint16_t keyType; + uint16_t nMaxKeyLen; + + __compar_fn_t comparator; + pthread_rwlock_t lock; // will be removed soon + tSkipListState state; // skiplist state +} tSkipList; + +/* + * iterate the skiplist + * this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may + * continue iterating the skiplist, so add the reference count for skiplist + * TODO add the ref for skiplist when one iterator is created + */ +typedef struct SSkipListIterator { + tSkipList * pSkipList; + tSkipListNode *cur; + int64_t num; +} SSkipListIterator; + +/* + * query condition structure to denote the range query + * todo merge the point query cond with range query condition + */ +typedef struct tSKipListQueryCond { + // when the upper bounding == lower bounding, it is a point query + tSkipListKey lowerBnd; + tSkipListKey upperBnd; + int32_t lowerBndRelOptr; // relation operator to denote if lower bound is + int32_t upperBndRelOptr; // included or not +} tSKipListQueryCond; + +tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen); + +void *SSkipListDestroy(tSkipList *pSkipList); + +// create skip list key +tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength); + +// destroy skip list key +void tSkipListDestroyKey(tSkipListKey *pKey); + +// put data into skiplist +tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey); + +/* + * get only *one* node of which key is equalled to pKey, even there are more + * than one nodes are of the same key + */ +tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey); + +/* + * get all data with the same keys + */ +int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes); + +int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), + void *param); + +/* + * remove only one node of the pKey value. + * If more than one node has the same value, any one will be removed + * + * @Return + * true: one node has been removed + * false: no node has been removed + */ +bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey); + +/* + * remove the specified node in parameters + */ +void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode); + +// for debug purpose only +void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel); + +/* + * range query & single point query function + */ +int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult); + +/* + * include/exclude point query + */ +int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, + tSkipListNode ***pResult); + +int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter); +bool tSkipListIteratorNext(SSkipListIterator *iter); +tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter); + +#ifdef __cplusplus +} +#endif + +#endif // TBASE_TSKIPLIST_H diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index ca9cc330c84bf7e3065bf75d3daa57312b284426..ecdbc38a0573678303f834545c9677ef9e7e1b4e 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -54,7 +54,7 @@ typedef struct SSkipListNode { #define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) #define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) -#define SL_GET_NODE_LEVEL(n) *(int32_t *)((n)) +#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n)) /* * @version 0.3 diff --git a/src/util/src/sskiplist.c b/src/util/src/sskiplist.c new file mode 100644 index 0000000000000000000000000000000000000000..47948e1660fb29aeffe5189ea8baa54f903dabff --- /dev/null +++ b/src/util/src/sskiplist.c @@ -0,0 +1,844 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "os.h" + +#include "tlog.h" +#include "taosdef.h" +#include "sskiplist.h" +#include "tutil.h" + +static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level + for (int32_t i = 0; i < nLevel; ++i) { + pSkipList->state.nLevelNodeCnt[i]++; + } +} + +static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { + for (int32_t i = 0; i < nLevel; ++i) { + pSkipList->state.nLevelNodeCnt[i]--; + } +} + +static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) { + const uint32_t factor = 4; + + int32_t n = 1; + while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) { + n++; + } + + return n; +} + +static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) { + int32_t nLevel = getSkipListNodeRandomHeight(pSkipList); + if (pSkipList->nSize == 0) { + nLevel = 1; + pSkipList->nLevel = 1; + } else { + if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) { + nLevel = (++pSkipList->nLevel); + } + } + return nLevel; +} + +void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode); + +void SSkipListDoRecordPut(tSkipList *pSkipList) { + const int32_t MAX_RECORD_NUM = 1000; + + if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) { + pSkipList->state.nInsertObjs = 1; + pSkipList->state.nTotalStepsForInsert = 0; + pSkipList->state.nTotalElapsedTimeForInsert = 0; + } else { + pSkipList->state.nInsertObjs++; + } +} + +int32_t compareIntVal(const void *pLeft, const void *pRight) { + int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; + int64_t rhs = ((tSkipListKey *)pRight)->i64Key; + + DEFAULT_COMP(lhs, rhs); +} + +int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) { + int64_t lhs = ((tSkipListKey *)pLeft)->i64Key; + double rhs = ((tSkipListKey *)pRight)->dKey; + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } +} + +int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) { + double lhs = ((tSkipListKey *)pLeft)->dKey; + int64_t rhs = ((tSkipListKey *)pRight)->i64Key; + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } +} + +int32_t scompareDoubleVal(const void *pLeft, const void *pRight) { + double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey); + if (fabs(ret) < FLT_EPSILON) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +int32_t scompareStrVal(const void *pLeft, const void *pRight) { + tSkipListKey *pL = (tSkipListKey *)pLeft; + tSkipListKey *pR = (tSkipListKey *)pRight; + + if (pL->nLen == 0 && pR->nLen == 0) { + return 0; + } + + //handle only one-side bound compare situation, there is only lower bound or only upper bound + if (pL->nLen == -1) { + return 1; // no lower bound, lower bound is minimum, always return -1; + } else if (pR->nLen == -1) { + return -1; // no upper bound, upper bound is maximum situation, always return 1; + } + + int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz); + + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +int32_t scompareWStrVal(const void *pLeft, const void *pRight) { + tSkipListKey *pL = (tSkipListKey *)pLeft; + tSkipListKey *pR = (tSkipListKey *)pRight; + + if (pL->nLen == 0 && pR->nLen == 0) { + return 0; + } + + //handle only one-side bound compare situation, there is only lower bound or only upper bound + if (pL->nLen == -1) { + return 1; // no lower bound, lower bound is minimum, always return -1; + } else if (pR->nLen == -1) { + return -1; // no upper bound, upper bound is maximum situation, always return 1; + } + + int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz); + + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) { + __compar_fn_t comparator = NULL; + + switch (pSkipList->keyType) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: { + if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { + comparator = compareIntVal; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = scompareIntDoubleVal; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { + comparator = scompareDoubleIntVal; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = scompareDoubleVal; + } + break; + } + case TSDB_DATA_TYPE_BINARY: + comparator = scompareStrVal; + break; + case TSDB_DATA_TYPE_NCHAR: + comparator = scompareWStrVal; + break; + default: + comparator = compareIntVal; + break; + } + + return comparator; +} + +static __compar_fn_t getKeyComparator(int32_t keyType) { + __compar_fn_t comparator = NULL; + + switch (keyType) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: + comparator = compareIntVal; + break; + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + comparator = scompareDoubleVal; + break; + + case TSDB_DATA_TYPE_BINARY: + comparator = scompareStrVal; + break; + + case TSDB_DATA_TYPE_NCHAR: + comparator = scompareWStrVal; + break; + + default: + comparator = compareIntVal; + break; + } + + return comparator; +} + +tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) { + tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList)); + if (pSkipList == NULL) { + return NULL; + } + + pSkipList->keyType = keyType; + + pSkipList->comparator = getKeyComparator(keyType); + pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL); + + pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL; + pSkipList->nLevel = 1; + + pSkipList->nMaxKeyLen = nMaxKeyLen; + pSkipList->nMaxLevel = nMaxLevel; + + if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) { + tfree(pSkipList->pHead.pForward); + tfree(pSkipList); + return NULL; + } + + srand(time(NULL)); + pSkipList->state.nTotalMemSize += sizeof(tSkipList); + return pSkipList; +} + +static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) { + int32_t level = pNode->nLevel; + for (int32_t j = level - 1; j >= 0; --j) { + if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) { + forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j]; + } + + if (forward[j]->pForward[j] != NULL) { + forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j]; + } + } + + pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2); + removeNodeEachLevel(pSkipList, pNode->nLevel); + + tfree(pNode); + --pSkipList->nSize; +} + +static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) { + size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1); + if (pKey->nType == TSDB_DATA_TYPE_BINARY) { + size += pKey->nLen + 1; + } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { + size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE; + } + + return size; +} + +static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) { + size_t nodeSize = getOneNodeSize(pKey, nLevel); + tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize); + + pNode->pForward = (tSkipListNode **)(&pNode[1]); + pNode->pBackward = (pNode->pForward + nLevel); + + pNode->pData = pData; + + pNode->key = *pKey; + if (pKey->nType == TSDB_DATA_TYPE_BINARY) { + pNode->key.pz = (char *)(pNode->pBackward + nLevel); + + strcpy(pNode->key.pz, pKey->pz); + pNode->key.pz[pKey->nLen] = 0; + } else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) { + pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel); + wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen); + pNode->key.wpz[pKey->nLen] = 0; + } + + pNode->nLevel = nLevel; + return pNode; +} + +tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) { + tSkipListKey k = {0}; + tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type); + return k; +} + +void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); } + +void* SSkipListDestroy(tSkipList *pSkipList) { + if (pSkipList == NULL) { + return NULL; + } + + pthread_rwlock_wrlock(&pSkipList->lock); + tSkipListNode *pNode = pSkipList->pHead.pForward[0]; + while (pNode) { + tSkipListNode *pTemp = pNode; + pNode = pNode->pForward[0]; + tfree(pTemp); + } + + tfree(pSkipList->pHead.pForward); + pthread_rwlock_unlock(&pSkipList->lock); + + pthread_rwlock_destroy(&pSkipList->lock); + tfree(pSkipList); + + return NULL; +} + +tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) { + if (pSkipList == NULL) { + return NULL; + } + + pthread_rwlock_wrlock(&pSkipList->lock); + + // record one node is put into skiplist + SSkipListDoRecordPut(pSkipList); + + tSkipListNode *px = &pSkipList->pHead; + + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { + while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) { + px = px->pForward[i]; + } + + pSkipList->state.nTotalStepsForInsert++; + forward[i] = px; + } + + // if the skiplist does not allowed identical key inserted, the new data will be discarded. + if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead && + (pSkipList->comparator(&forward[0]->key, pKey) == 0)) { + pthread_rwlock_unlock(&pSkipList->lock); + return forward[0]; + } + + int32_t nLevel = getSkipListNodeLevel(pSkipList); + recordNodeEachLevel(pSkipList, nLevel); + + tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel); + tSkipListDoInsert(pSkipList, forward, nLevel, pNode); + + pSkipList->nSize += 1; + + // char tmpstr[512] = {0}; + // tVariantToString(&pNode->key, tmpstr); + // pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList, + // tmpstr, pSkipList->nSize); + + pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel); + pthread_rwlock_unlock(&pSkipList->lock); + + return pNode; +} + +void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) { + for (int32_t i = 0; i < nLevel; ++i) { + tSkipListNode *x = forward[i]; + if (x != NULL) { + pNode->pBackward[i] = x; + if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode; + + pNode->pForward[i] = x->pForward[i]; + x->pForward[i] = pNode; + } else { + pSkipList->pHead.pForward[i] = pNode; + pNode->pBackward[i] = &(pSkipList->pHead); + } + } +} + +tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) { + int32_t sLevel = pSkipList->nLevel - 1; + int32_t ret = -1; + + tSkipListNode *x = &pSkipList->pHead; + + pthread_rwlock_rdlock(&pSkipList->lock); + pSkipList->state.queryCount++; + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + for (int32_t i = sLevel; i >= 0; --i) { + while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { + x = x->pForward[i]; + } + + if (ret == 0) { + pthread_rwlock_unlock(&pSkipList->lock); + return x->pForward[i]; + } + } + + pthread_rwlock_unlock(&pSkipList->lock); + return NULL; +} + +static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey, + int32_t cond, tSkipListNode ***pRes) { + pthread_rwlock_rdlock(&pSkipList->lock); + tSkipListNode *p = pStartNode; + int32_t numOfRes = 0; + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType); + while (p != NULL) { + int32_t ret = filterComparator(&p->key, pEndKey); + if (ret > 0) { + break; + } + + if (ret < 0) { + numOfRes++; + p = p->pForward[0]; + } else if (ret == 0) { + if (cond == TSDB_RELATION_LESS_EQUAL) { + numOfRes++; + p = p->pForward[0]; + } else { + break; + } + } + } + + (*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes); + for (int32_t i = 0; i < numOfRes; ++i) { + (*pRes)[i] = pStartNode; + pStartNode = pStartNode->pForward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + + return numOfRes; +} + +/* + * maybe return the copy of tSkipListNode would be better + */ +int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) { + (*pRes) = NULL; + + tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey); + if (pNode == NULL) { + return 0; + } + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + // backward check if previous nodes are with the same value. + tSkipListNode *pPrev = pNode->pBackward[0]; + while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) { + pPrev = pPrev->pBackward[0]; + } + + return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes); +} + +static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) { + int32_t sLevel = pSkipList->nLevel - 1; + int32_t ret = -1; + + tSkipListNode *x = &pSkipList->pHead; + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + pthread_rwlock_rdlock(&pSkipList->lock); + + if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) { + for (int32_t i = sLevel; i >= 0; --i) { + while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) { + x = x->pForward[i]; + } + } + + // backward check if previous nodes are with the same value. + if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) { + tSkipListNode *pNode = x->pForward[0]; + while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) { + pNode = pNode->pBackward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + return pNode; + } + + if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) { + pthread_rwlock_unlock(&pSkipList->lock); + return x->pForward[0]; + } else { // cond == TSDB_RELATION_LARGE && ret == 0 + tSkipListNode *pn = x->pForward[0]; + while (pn != NULL && filterComparator(&pn->key, pKey) == 0) { + pn = pn->pForward[0]; + } + pthread_rwlock_unlock(&pSkipList->lock); + return pn; + } + } + + pthread_rwlock_unlock(&pSkipList->lock); + return NULL; +} + +int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *), + void *param) { + (*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize); + if (NULL == *pRes) { + pError("error skiplist %p, malloc failed", pSkipList); + return -1; + } + + pthread_rwlock_rdlock(&pSkipList->lock); + tSkipListNode *pStartNode = pSkipList->pHead.pForward[0]; + int32_t num = 0; + + for (int32_t i = 0; i < pSkipList->nSize; ++i) { + if (pStartNode == NULL) { + pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1); +#ifdef _DEBUG_VIEW + SSkipListPrint(pSkipList, 1); +#endif + break; + } + + if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) { + (*pRes)[num++] = pStartNode; + } + + pStartNode = pStartNode->pForward[0]; + } + + pthread_rwlock_unlock(&pSkipList->lock); + + if (num == 0) { + free(*pRes); + *pRes = NULL; + } else if (num < pSkipList->nSize) { // free unused memory + char* tmp = realloc((*pRes), num * POINTER_BYTES); + assert(tmp != NULL); + + *pRes = (tSkipListNode**)tmp; + } + + return num; +} + +int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) { + if (pSkipList == NULL) { + return -1; + } + + iter->pSkipList = pSkipList; + + pthread_rwlock_rdlock(&pSkipList->lock); + iter->cur = NULL;//pSkipList->pHead.pForward[0]; + iter->num = pSkipList->nSize; + pthread_rwlock_unlock(&pSkipList->lock); + + return 0; +} + +bool tSkipListIteratorNext(SSkipListIterator* iter) { + if (iter->num == 0 || iter->pSkipList == NULL) { + return false; + } + + tSkipList* pSkipList = iter->pSkipList; + + pthread_rwlock_rdlock(&pSkipList->lock); + if (iter->cur == NULL) { + iter->cur = pSkipList->pHead.pForward[0]; + } else { + iter->cur = iter->cur->pForward[0]; + } + + pthread_rwlock_unlock(&pSkipList->lock); + + return iter->cur != NULL; +} + +tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) { + return iter->cur; +} + +int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) { + pSkipList->state.queryCount++; + tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr); + if (pStart == 0) { + *pRes = NULL; + return 0; + } + + return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes); +} + +static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) { + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) { + tSkipListNode *p = forward[0]->pForward[0]; + doRemove(pSkipList, p, forward); + } else { // failed to find the node of specified value,abort + return false; + } + + // compress the minimum level of skip list + while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) { + pSkipList->nLevel -= 1; + } + + return true; +} + +void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) { + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + + pthread_rwlock_rdlock(&pSkipList->lock); + for (int32_t i = 0; i < pNode->nLevel; ++i) { + forward[i] = pNode->pBackward[i]; + } + + removeSupport(pSkipList, forward, &pNode->key); + pthread_rwlock_unlock(&pSkipList->lock); +} + +bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) { + tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + pthread_rwlock_rdlock(&pSkipList->lock); + + tSkipListNode *x = &pSkipList->pHead; + for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) { + while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) { + x = x->pForward[i]; + } + forward[i] = x; + } + + bool ret = removeSupport(pSkipList, forward, pKey); + pthread_rwlock_unlock(&pSkipList->lock); + + return ret; +} + +void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) { + if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) { + return; + } + + tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1]; + int32_t id = 1; + while (p) { + switch (pSkipList->keyType) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_BIGINT: + fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key); + break; + case TSDB_DATA_TYPE_BINARY: + fprintf(stdout, "%d: %s \n", id++, p->key.pz); + break; + case TSDB_DATA_TYPE_DOUBLE: + fprintf(stdout, "%d: %lf \n", id++, p->key.dKey); + break; + default: + fprintf(stdout, "\n"); + } + p = p->pForward[nlevel - 1]; + } +} + +/* + * query processor based on query condition + */ +int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) { + // query condition check + int32_t rel = 0; + __compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType); + + if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 || + (((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 && + pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) { + (*pResult) = NULL; + return 0; + } + + if (rel == 0) { + /* + * 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd + * point query + */ + if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL && + pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query + return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult); + } else { + (*pResult) = NULL; + return 0; + } + } else { + /* range query, query operation code check */ + return tSkipListRangeQuery(pSkipList, pQueryCond, pResult); + } +} + +typedef struct MultipleQueryResult { + int32_t len; + tSkipListNode **pData; +} MultipleQueryResult; + +static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) { + int32_t total = 0; + for (int32_t i = 0; i < numOfResSet; ++i) { + total += pResults[i].len; + } + + (*pRes) = malloc(POINTER_BYTES * total); + int32_t idx = 0; + + for (int32_t i = 0; i < numOfResSet; ++i) { + MultipleQueryResult *pOneResult = &pResults[i]; + for (int32_t j = 0; j < pOneResult->len; ++j) { + (*pRes)[idx++] = pOneResult->pData[j]; + } + } + + return total; +} + +static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) { + if (*numOfKey == 1) { + return; + } + + qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator); + int32_t i = 0, j = 1; + + while (i < (*numOfKey) && j < (*numOfKey)) { + int32_t ret = comparator(&pKey[i], &pKey[j]); + if (ret == 0) { + j++; + } else { + pKey[i + 1] = pKey[j]; + i++; + j++; + } + } + + (*numOfKey) = i + 1; +} + +int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator, + tSkipListNode *pNode) { + int32_t i = 0, j = 0; + // merge two sorted arrays in O(n) time + while (i < numOfKey && pNode != NULL) { + int32_t ret = comparator(&pNode->key, &pKey[i]); + if (ret < 0) { + (*pRes)[j++] = pNode; + pNode = pNode->pForward[0]; + } else if (ret == 0) { + pNode = pNode->pForward[0]; + } else { // pNode->key > pkey[i] + i++; + } + } + + while (pNode != NULL) { + (*pRes)[j++] = pNode; + pNode = pNode->pForward[0]; + } + return j; +} + +int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type, + tSkipListNode ***pRes) { + if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 || + (type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) { + (*pRes) = NULL; + return 0; + } + + __compar_fn_t comparator = getKeyComparator(pKey->nType); + removeDuplicateKey(pKey, &numOfKey, comparator); + + if (type == INCLUDE_POINT_QUERY) { + if (numOfKey == 1) { + return tSkipListGets(pSkipList, &pKey[0], pRes); + } else { + MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey); + for (int32_t i = 0; i < numOfKey; ++i) { + pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData); + } + int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes); + + for (int32_t i = 0; i < numOfKey; ++i) { + free(pTempResult[i].pData); + } + free(pTempResult); + return num; + } + } else { // exclude query + *pRes = malloc(POINTER_BYTES * pSkipList->nSize); + + __compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType); + + tSkipListNode *pNode = pSkipList->pHead.pForward[0]; + int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode); + + if (retLen < pSkipList->nSize) { + (*pRes) = realloc(*pRes, POINTER_BYTES * retLen); + } + return retLen; + } +} diff --git a/src/util/src/thistogram.c b/src/util/src/thistogram.c index b603163b399eb0af3194576fefe4b390c2ec2ccd..31045a4957be59d924eb70fb0876eefb6715aaa2 100644 --- a/src/util/src/thistogram.c +++ b/src/util/src/thistogram.c @@ -126,7 +126,7 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { SHistogramInfo* pHisto = malloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1)); #if !defined(USE_ARRAYLIST) - pHisto->pList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + pHisto->pList = SSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); SInsertSupporter* pss = malloc(sizeof(SInsertSupporter)); pss->numOfEntries = pHisto->maxEntries; pss->pSkipList = pHisto->pList; @@ -185,7 +185,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { SHistBin* entry = calloc(1, sizeof(SHistBin)); entry->val = val; - tSkipListNode* pResNode = tSkipListPut((*pHisto)->pList, entry, &key, 0); + tSkipListNode* pResNode = SSkipListPut((*pHisto)->pList, entry, &key, 0); SHistBin* pEntry1 = (SHistBin*)pResNode->pData; pEntry1->index = -1; @@ -239,7 +239,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { // set the right value for loser-tree assert((*pHisto)->pLoserTree != NULL); if (!(*pHisto)->ordered) { - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; @@ -281,7 +281,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { printf("delta is:%lf\n", pEntry1->delta); - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); /* the chosen node */ tSkipListNode* pNode = (*pHisto)->pLoserTree->pNode[0].pData; @@ -319,7 +319,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); // remove the next node in skiplist tSkipListRemoveNode((*pHisto)->pList, pNext); - tSkipListPrint((*pHisto)->pList, 1); + SSkipListPrint((*pHisto)->pList, 1); tLoserTreeDisplay((*pHisto)->pLoserTree); } else { // add to heap diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 22770e379906aeb5d226a38b05e45da144323dc2..8224507f1c68576a0c9fe5f627eb5cb9539eea8f 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -6,4 +6,4 @@ add_subdirectory(tsdb) enable_testing() -add_subdirectory(tests) \ No newline at end of file +# add_subdirectory(tests) \ No newline at end of file diff --git a/src/vnode/common/inc/dataformat.h b/src/vnode/common/inc/dataformat.h index 9399d38023f2a8cff6ba5b840575d9ed40d8b7c5..43eb1ad103afac0ea75f94a04238a30d5269d361 100644 --- a/src/vnode/common/inc/dataformat.h +++ b/src/vnode/common/inc/dataformat.h @@ -12,10 +12,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#if !defined(_TD_DATA_FORMAT_H_) +#ifndef _TD_DATA_FORMAT_H_ #define _TD_DATA_FORMAT_H_ #include +#include #include "schema.h" @@ -24,23 +25,57 @@ extern "C" { #endif // ----------------- Data row structure -/* A data row, the format of it is like below: +/* A data row, the format is like below: * +---------+---------------------------------+ * | int32_t | | * +---------+---------------------------------+ - * | len | data | + * | len | row | * +---------+---------------------------------+ + * len: the length including sizeof(row) + sizeof(len) + * row: actual row data encoding */ -typedef char* SDataRow; +typedef void *SDataRow; + +#define dataRowLen(r) (*(int32_t *)(r)) +#define dataRowTuple(r) ((char *)(r) + sizeof(int32_t)) +#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) +#define dataRowIdx(r, i) ((char *)(r) + i) +#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) + +SDataRow tdNewDataRow(int32_t bytes); +SDataRow tdNewDdataFromSchema(SSchema *pSchema); +void tdFreeDataRow(SDataRow row); +int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset); +void tdDataRowCpy(void *dst, SDataRow row); +void tdDataRowReset(SDataRow row); +SDataRow tdDataRowDup(SDataRow row); /* Data rows definition, the format of it is like below: - * +---------+---------+-----------------------+--------+-----------------------+ - * | int32_t | int32_t | | | | - * +---------+---------+-----------------------+--------+-----------------------+ - * | len | nrows | SDataRow | .... | SDataRow | - * +---------+---------+-----------------------+--------+-----------------------+ + * +---------+-----------------------+--------+-----------------------+ + * | int32_t | | | | + * +---------+-----------------------+--------+-----------------------+ + * | len | SDataRow | .... | SDataRow | + * +---------+-----------------------+--------+-----------------------+ */ -typedef char * SDataRows; +typedef void *SDataRows; + +#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t) + +#define dataRowsLen(rs) (*(int32_t *)(rs)) +#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l)) +#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t)) + +void tdDataRowsAppendRow(SDataRows rows, SDataRow row); + +// Data rows iterator +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SDataRowsIter; + +void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter); +SDataRow tdDataRowsNext(SDataRowsIter *pIter); /* Data column definition * +---------+---------+-----------------------+ @@ -49,7 +84,7 @@ typedef char * SDataRows; * | len | npoints | data | * +---------+---------+-----------------------+ */ -typedef char * SDataCol; +typedef char *SDataCol; /* Data columns definition * +---------+---------+-----------------------+--------+-----------------------+ @@ -58,38 +93,10 @@ typedef char * SDataCol; * | len | npoints | SDataCol | .... | SDataCol | * +---------+---------+-----------------------+--------+-----------------------+ */ -typedef char * SDataCols; - -// ----------------- Data column structure - -// ---- operation on SDataRow; -#define TD_DATA_ROW_HEADER_SIZE sizeof(int32_t) -#define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow)) -#define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t)) - -SDataRow tdSDataRowDup(SDataRow rdata); - -// ---- operation on SDataRows -#define TD_DATAROWS_LEN(pDataRows) (*(int32_t *)(pDataRows)) -#define TD_DATAROWS_ROWS(pDataRows) (*(int32_t *)(pDataRows + sizeof(int32_t))) -#define TD_NEXT_DATAROW(pDataRow) ((pDataRow) + TD_DATAROW_LEN(pDataRow)) - -// ---- operation on SDataCol -#define TD_DATACOL_LEN(pDataCol) (*(int32_t *)(pDataCol)) -#define TD_DATACOL_NPOINTS(pDataCol) (*(int32_t *)(pDataCol + sizeof(int32_t))) - -// ---- operation on SDataCols -#define TD_DATACOLS_LEN(pDataCols) (*(int32_t *)(pDataCols)) -#define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t))) - -// ---- -/** - * Get the maximum - */ -int32_t tdGetMaxDataRowSize(SSchema *pSchema); +typedef char *SDataCols; #ifdef __cplusplus } #endif -#endif // _TD_DATA_FORMAT_H_ +#endif // _TD_DATA_FORMAT_H_ diff --git a/src/vnode/common/inc/key.h b/src/vnode/common/inc/key.h deleted file mode 100644 index 1f0478bda3b8a1adc19bafb8a1a598cc9544ec1b..0000000000000000000000000000000000000000 --- a/src/vnode/common/inc/key.h +++ /dev/null @@ -1,10 +0,0 @@ -#if !defined(_TD_KEY_H_) -#define _TD_KEY_H_ - -typedef struct { - -} key; - - - -#endif // _TD_KEY_H_ diff --git a/src/vnode/common/inc/list.h b/src/vnode/common/inc/list.h deleted file mode 100644 index 489e39442b4b0fb563eb12d80a73dca297f6845c..0000000000000000000000000000000000000000 --- a/src/vnode/common/inc/list.h +++ /dev/null @@ -1,20 +0,0 @@ -#if !defined(_TD_LIST_H_) -#define _TD_LIST_H_ - -#include - -typedef enum { TD_LIST_ORDERED, TD_LIST_UNORDERED } TLIST_TYPE; - -typedef int32_t (* comparefn(void *key1, void *key2)); - -struct _list_type { - TLIST_TYPE type; -}; - -typedef struct _list_node { -} SListNode; - -typedef struct _list { -} SList; - -#endif // _TD_LIST_H_ diff --git a/src/vnode/common/inc/schema.h b/src/vnode/common/inc/schema.h index 5387dbf99bdf9d8f404733db66c1bddd3bafbc32..97d177ecffdce76baa55a3588f2300eba78e4750 100644 --- a/src/vnode/common/inc/schema.h +++ b/src/vnode/common/inc/schema.h @@ -1,4 +1,4 @@ -#if !defined(_TD_SCHEMA_H_) +#ifndef _TD_SCHEMA_H_ #define _TD_SCHEMA_H_ #include @@ -6,25 +6,50 @@ #include "type.h" -// Column definition -// TODO: if we need to align the structure +#ifdef __cplusplus +extern "C" { +#endif + +// ---- Column definition and operations typedef struct { - td_datatype_t type; // Column type - int32_t colId; // column ID - int32_t bytes; // column bytes - int32_t offset; // point offset in a row data - char * colName; // the column name + int8_t type; // Column type + int16_t colId; // column ID + int16_t bytes; // column bytes + int32_t offset; // point offset in a row data } SColumn; -// Schema definition +#define colType(col) ((col)->type) +#define colColId(col) ((col)->colId) +#define colBytes(col) ((col)->bytes) +#define colOffset(col) ((col)->offset) + +#define colSetType(col, t) (colType(col) = (t)) +#define colSetColId(col, id) (colColId(col) = (id)) +#define colSetBytes(col, b) (colBytes(col) = (b)) +#define colSetOffset(col, o) (colOffset(col) = (o)) + +SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes); +void tdFreeCol(SColumn *pCol); +void tdColCpy(SColumn *dst, SColumn *src); + +// ---- Schema definition and operations typedef struct { - int32_t version; // schema version, it is used to change the schema int32_t numOfCols; - int32_t numOfTags; - int32_t colIdCounter; - SColumn *columns; + int32_t padding; // TODO: replace the padding for useful variable + SColumn columns[]; } SSchema; +#define schemaNCols(s) ((s)->numOfCols) +#define schemaColAt(s, i) ((s)->columns + i) + +SSchema *tdNewSchema(int32_t nCols); +SSchema *tdDupSchema(SSchema *pSchema); +void tdFreeSchema(SSchema *pSchema); +void tdUpdateSchema(SSchema *pSchema); +int32_t tdMaxRowDataBytes(SSchema *pSchema); + +// ---- Inline schema definition and operations + /* Inline schema definition * +---------+---------+---------+-----+---------+-----------+-----+-----------+ * | int32_t | | | | | | | | @@ -34,41 +59,10 @@ typedef struct { */ typedef char *SISchema; -// TODO: decide if the space is allowed -#define TD_ISCHEMA_HEADER_SIZE sizeof(int32_t) + sizeof(SSchema) - -// ---- operations on SColumn -#define TD_COLUMN_TYPE(pCol) ((pCol)->type) // column type -#define TD_COLUMN_ID(pCol) ((pCol)->colId) // column ID -#define TD_COLUMN_BYTES(pCol) ((pCol)->bytes) // column bytes -#define TD_COLUMN_OFFSET(pCol) ((pCol)->offset) // column bytes -#define TD_COLUMN_NAME(pCol) ((pCol)->colName) // column name -#define TD_COLUMN_INLINE_SIZE(pCol) (sizeof(SColumn) + TD_COLUMN_NAME(pCol) + 1) - -// ---- operations on SSchema -#define TD_SCHEMA_VERSION(pSchema) ((pSchema)->version) // schema version -#define TD_SCHEMA_NCOLS(pSchema) ((pSchema)->numOfCols) // schema number of columns -#define TD_SCHEMA_NTAGS(pSchema) ((pSchema)->numOfTags) // schema number of tags -#define TD_SCHEMA_TOTAL_COLS(pSchema) (TD_SCHEMA_NCOLS(pSchema) + TD_SCHEMA_NTAGS(pSchema)) // schema total number of SColumns (#columns + #tags) -#define TD_SCHEMA_NEXT_COLID(pSchema) ((pSchema)->colIdCounter++) -#define TD_SCHEMA_COLS(pSchema) ((pSchema)->columns) -#define TD_SCHEMA_TAGS(pSchema) (TD_SCHEMA_COLS(pSchema) + TD_SCHEMA_NCOLS(pSchema)) -#define TD_SCHEMA_COLUMN_AT(pSchema, idx) (TD_SCHEMA_COLS(pSchema) + idx) -#define TD_SCHEMA_TAG_AT(pSchema, idx) (TD_SCHEMA_TAGS(pSchema) + idx) - -// ---- operations on SISchema -#define TD_ISCHEMA_LEN(pISchema) *((int32_t *)(pISchema)) -#define TD_ISCHEMA_SCHEMA(pISchema) ((SSchema *)((pISchema) + sizeof(int32_t))) -#define TD_ISCHEMA_COL_NAMES(pISchema) ((pISchema) + TD_ISCHEMA_HEADER_SIZE + sizeof(SColumn) * TD_SCHEMA_TOTAL_COLS(TD_ISCHEMA_SCHEMA(pISchema))) - -// ---- -/* Convert a schema structure to an inline schema structure - */ -SISchema tdConvertSchemaToInline(SSchema *pSchema); -int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName); -int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId); -SSchema *tdDupSchema(SSchema *pSchema); +// TODO: add operations on SISchema -// ---- TODO: operations to modify schema +#ifdef __cplusplus +} +#endif #endif // _TD_SCHEMA_H_ diff --git a/src/vnode/common/inc/type.h b/src/vnode/common/inc/type.h index 1aaa95cb90baa041a98dff971cbf4ef47e7ced55..4ca80cee339f3b7978d055fa3356c3caf9141a71 100644 --- a/src/vnode/common/inc/type.h +++ b/src/vnode/common/inc/type.h @@ -1,4 +1,4 @@ -#if !defined(_TD_TYPE_H_) +#ifndef _TD_TYPE_H_ #define _TD_TYPE_H_ #include @@ -11,6 +11,7 @@ typedef enum { TD_DATATYPE_BIGINT, TD_DATATYPE_FLOAT, TD_DATATYPE_DOUBLE, + TD_DATATYPE_TIMESTAMP, TD_DATATYPE_VARCHAR, TD_DATATYPE_NCHAR, TD_DATATYPE_BINARY @@ -26,6 +27,7 @@ extern const int32_t rowDataLen[]; #define TD_DATATYPE_BIGINT_NULL #define TD_DATATYPE_FLOAT_NULL #define TD_DATATYPE_DOUBLE_NULL +#define TD_DATATYPE_TIMESTAMP_NULL #define TD_DATATYPE_VARCHAR_NULL #define TD_DATATYPE_NCHAR_NULL #define TD_DATATYPE_BINARY_NULL diff --git a/src/vnode/common/src/dataformat.c b/src/vnode/common/src/dataformat.c index 1f2dfc7dad573e75e0066ccd4819142ee452e166..f8f794e882fe174bac658a8e8e4a97bba4eb5b2d 100644 --- a/src/vnode/common/src/dataformat.c +++ b/src/vnode/common/src/dataformat.c @@ -1,34 +1,123 @@ -#include - #include "dataformat.h" -int32_t tdGetMaxDataRowSize(SSchema *pSchema) { - int32_t nbytes = 0; - - for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { - SColumn * pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); - td_datatype_t type = TD_COLUMN_TYPE(pCol); - - nbytes += rowDataLen[type]; - - switch (type) { - case TD_DATATYPE_VARCHAR: - nbytes += TD_COLUMN_BYTES(pCol); - break; - case TD_DATATYPE_NCHAR: - nbytes += 4 * TD_COLUMN_BYTES(pCol); - break; - case TD_DATATYPE_BINARY: - nbytes += TD_COLUMN_BYTES(pCol); - break; - default: - break; - } +/** + * Create a data row with maximum row length bytes. + * + * NOTE: THE AAPLICATION SHOULD MAKE SURE BYTES IS LARGE ENOUGH TO + * HOLD THE WHOE ROW. + * + * @param bytes max bytes a row can take + * @return SDataRow object for success + * NULL for failure + */ +SDataRow tdNewDataRow(int32_t bytes) { + int32_t size = sizeof(int32_t) + bytes; + + SDataRow row = malloc(size); + if (row == NULL) return NULL; + + dataRowSetLen(row, sizeof(int32_t)); + + return row; +} + +SDataRow tdNewDdataFromSchema(SSchema *pSchema) { + int32_t bytes = tdMaxRowDataBytes(pSchema); + return tdNewDataRow(bytes); +} + +/** + * Free the SDataRow object + */ +void tdFreeDataRow(SDataRow row) { + if (row) free(row); +} + +/** + * Append a column value to a SDataRow object. + * NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES + * THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE. + * + * @param row the row to append value to + * @param value value pointer to append + * @param pSchema schema + * @param colIdx column index + * + * @return 0 for success and -1 for failure + */ +int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) { + int32_t offset; + + switch (pCol->type) { + case TD_DATATYPE_BOOL: + case TD_DATATYPE_TINYINT: + case TD_DATATYPE_SMALLINT: + case TD_DATATYPE_INT: + case TD_DATATYPE_BIGINT: + case TD_DATATYPE_FLOAT: + case TD_DATATYPE_DOUBLE: + case TD_DATATYPE_TIMESTAMP: + memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]); + if (dataRowLen(row) < suffixOffset + sizeof(int32_t)) + dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]); + break; + case TD_DATATYPE_VARCHAR: + offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset; + memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset)); + case TD_DATATYPE_NCHAR: + case TD_DATATYPE_BINARY: + break; + default: + return -1; } - nbytes += TD_DATA_ROW_HEADER_SIZE; + return 0; +} - return nbytes; +/** + * Copy a data row to a destination + * ASSUMPTIONS: dst has enough room for a copy of row + */ +void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); } +void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); } +SDataRow tdDataRowDup(SDataRow row) { + SDataRow trow = tdNewDataRow(dataRowLen(row)); + if (trow == NULL) return NULL; + + dataRowCpy(trow, row); + return row; +} + +void tdDataRowsAppendRow(SDataRows rows, SDataRow row) { + tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row); + dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row)); } -SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; } \ No newline at end of file +// Initialize the iterator +void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) { + if (pIter == NULL) return; + pIter->totalLen = dataRowsLen(rows); + + if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) { + pIter->row = NULL; + return; + } + + pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN); + pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row); +} + +// Get the next row in Rows +SDataRow tdDataRowsNext(SDataRowsIter *pIter) { + SDataRow row = pIter->row; + if (row == NULL) return NULL; + + if (pIter->len >= pIter->totalLen) { + pIter->row = NULL; + } else { + pIter->row = (char *)row + dataRowLen(row); + pIter->len += dataRowLen(row); + } + + return row; +} \ No newline at end of file diff --git a/src/vnode/common/src/schema.c b/src/vnode/common/src/schema.c index eb1b4eb84b3d716c7091553ffde83135b75fc691..be7007ca8bb58d3547dbffb9064e5216791b4ae7 100644 --- a/src/vnode/common/src/schema.c +++ b/src/vnode/common/src/schema.c @@ -1,6 +1,7 @@ #include #include "schema.h" + const int32_t rowDataLen[] = { sizeof(int8_t), // TD_DATATYPE_BOOL, sizeof(int8_t), // TD_DATATYPE_TINYINT, @@ -9,93 +10,127 @@ const int32_t rowDataLen[] = { sizeof(int64_t), // TD_DATATYPE_BIGINT, sizeof(float), // TD_DATATYPE_FLOAT, sizeof(double), // TD_DATATYPE_DOUBLE, + sizeof(int64_t), // TD_DATATYPE_TIMESTAMP sizeof(int32_t), // TD_DATATYPE_VARCHAR, sizeof(int32_t), // TD_DATATYPE_NCHAR, sizeof(int32_t) // TD_DATATYPE_BINARY }; -static size_t tdGetEstimatedISchemaLen(SSchema *pSchema) { - size_t colNameLen = 0; - for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { - colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1); - } - - for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { - colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1); +/** + * Create a new SColumn object + * ASSUMPTIONS: VALID PARAMETERS + * + * @param type column type + * @param colId column ID + * @param bytes maximum bytes the col taken + * + * @return a SColumn object on success + * NULL for failure + */ +SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) { + SColumn *pCol = (SColumn *)calloc(1, sizeof(SColumn)); + if (pCol == NULL) return NULL; + + colSetType(pCol, type); + colSetColId(pCol, colId); + switch (type) { + case TD_DATATYPE_VARCHAR: + case TD_DATATYPE_NCHAR: + case TD_DATATYPE_BINARY: + colSetBytes(pCol, bytes); + break; + default: + colSetBytes(pCol, rowDataLen[type]); + break; } - return TD_ISCHEMA_HEADER_SIZE + (size_t)TD_SCHEMA_TOTAL_COLS(pSchema) + colNameLen; + return pCol; } -static void tdUpdateColumnOffsets(SSchema *pSchema) { - int32_t offset = 0; - for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) - { - SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); - TD_COLUMN_OFFSET(pCol) = offset; - offset += rowDataLen[TD_COLUMN_TYPE(pCol)]; - } - - offset = 0; - for (size_t i = 0; i < TD_SCHEMA_NTAGS(pSchema); i++) { - SColumn *pCol = TD_SCHEMA_TAG_AT(pSchema, i); - TD_COLUMN_OFFSET(pCol) = offset; - offset += rowDataLen[TD_COLUMN_TYPE(pCol)]; - } +/** + * Free a SColumn object CREATED with tdNewCol + */ +void tdFreeCol(SColumn *pCol) { + if (pCol) free(pCol); } -SISchema tdConvertSchemaToInline(SSchema *pSchema) { - size_t len = tdGetEstimatedISchemaLen(pSchema); - int32_t totalCols = TD_SCHEMA_TOTAL_COLS(pSchema); - // TODO: if use pISchema is reasonable? - SISchema pISchema = malloc(len); - if (pSchema == NULL) { - // TODO: add error handling - return NULL; - } - - TD_ISCHEMA_LEN(pISchema) = (int32_t)len; - memcpy((void *)TD_ISCHEMA_SCHEMA(pISchema), (void *)pSchema, sizeof(SSchema)); - TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)) = (SColumn *)(pISchema + TD_ISCHEMA_HEADER_SIZE); - memcpy((void *)TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)), (void *)TD_SCHEMA_COLS(pSchema), - sizeof(SColumn) * totalCols); +void tdColCpy(SColumn *dst, SColumn *src) { memcpy((void *)dst, (void *)src, sizeof(SColumn)); } + +/** + * Create a SSchema object with nCols columns + * ASSUMPTIONS: VALID PARAMETERS + * + * @param nCols number of columns the schema has + * + * @return a SSchema object for success + * NULL for failure + */ +SSchema *tdNewSchema(int32_t nCols) { + int32_t size = sizeof(SSchema) + sizeof(SColumn) * nCols; + + SSchema *pSchema = (SSchema *)calloc(1, size); + if (pSchema == NULL) return NULL; + pSchema->numOfCols = nCols; + + return pSchema; +} - char *pName = TD_ISCHEMA_COL_NAMES(pISchema); - for (int32_t i = 0; i < totalCols; i++) { - SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i); - char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i)); +/** + * Free the SSchema object created by tdNewSchema or tdDupSchema + */ +void tdFreeSchema(SSchema *pSchema) { + if (pSchema == NULL) free(pSchema); +} - TD_COLUMN_NAME(pCol) = pName; +SSchema *tdDupSchema(SSchema *pSchema) { + SSchema *tSchema = tdNewSchema(schemaNCols(pSchema)); + if (tSchema == NULL) return NULL; - size_t tlen = strlen(colName) + 1; - memcpy((void *)pName, (void *)colName, tlen); - pName += tlen; - } + int32_t size = sizeof(SSchema) + sizeof(SColumn) * schemaNCols(pSchema); + memcpy((void *)tSchema, (void *)pSchema, size); - return pISchema; + return tSchema; } -int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName) { - for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) { - SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); - if (strcmp(colName, TD_COLUMN_NAME(pCol)) == 0) { - return i; - } +/** + * Function to update each columns's offset field in the schema. + * ASSUMPTIONS: VALID PARAMETERS + */ +void tdUpdateSchema(SSchema *pSchema) { + SColumn *pCol = NULL; + int32_t offset = 0; + for (int i = 0; i < schemaNCols(pSchema); i++) { + pCol = schemaColAt(pSchema, i); + colSetOffset(pCol, offset); + offset += rowDataLen[pCol->type]; } - - return -1; } -int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId) { - for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) { - SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); - if (TD_COLUMN_ID(pCol) == colId) { - return i; +/** + * Get the maximum size of a row data with the schema + */ +int32_t tdMaxRowDataBytes(SSchema *pSchema) { + int32_t size = 0; + SColumn *pCol = NULL; + for (int i = 0; i < schemaNCols(pSchema); i++) { + pCol = schemaColAt(pSchema, i); + size += rowDataLen[pCol->type]; + + switch (pCol->type) { + case TD_DATATYPE_VARCHAR: + size += (pCol->bytes + 1); // TODO: remove literal here + break; + case TD_DATATYPE_NCHAR: + size += (pCol->bytes + 4); // TODO: check and remove literal here + break; + case TD_DATATYPE_BINARY: + size += pCol->bytes; + break; + + default: + break; } } - return -1; -} -SSchema *tdDupSchema(SSchema *pSchema) { - return NULL; + return size; } \ No newline at end of file diff --git a/src/vnode/tests/CMakeLists.txt b/src/vnode/tests/CMakeLists.txt deleted file mode 100644 index 786fa9a66f8959ca6faa1278f59c903939218e05..0000000000000000000000000000000000000000 --- a/src/vnode/tests/CMakeLists.txt +++ /dev/null @@ -1,3 +0,0 @@ -add_subdirectory(common) - -add_subdirectory(tsdb) \ No newline at end of file diff --git a/src/vnode/tests/common/CMakeLists.txt b/src/vnode/tests/common/CMakeLists.txt deleted file mode 100644 index 093768be1a1398932d66825f2f2dcf4151ae1d65..0000000000000000000000000000000000000000 --- a/src/vnode/tests/common/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ -aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) - -message(STATUS "COMMON: ${SOURCE_LIST}") - -add_executable(commonTests ${SOURCE_LIST}) - -target_link_libraries(commonTests gtest gtest_main pthread common) - -add_test( - NAME - unit - COMMAND - ${CMAKE_CURRENT_BINARY_DIR}/commonTests -) \ No newline at end of file diff --git a/src/vnode/tests/common/commonDataTests.cpp b/src/vnode/tests/common/commonDataTests.cpp deleted file mode 100644 index 7becb8699c117d50e19d4f5a8ed8944a82da794e..0000000000000000000000000000000000000000 --- a/src/vnode/tests/common/commonDataTests.cpp +++ /dev/null @@ -1,7 +0,0 @@ -#include - -#include "dataformat.h" - -TEST(commonDataTests, createDataRow) { - EXPECT_EQ(1, 2/2); -} \ No newline at end of file diff --git a/src/vnode/tests/common/commonSChemaTests.cpp b/src/vnode/tests/common/commonSChemaTests.cpp deleted file mode 100644 index 44fd384b61c567487477b3c60e24630c4b4e935c..0000000000000000000000000000000000000000 --- a/src/vnode/tests/common/commonSChemaTests.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include -#include -#include - -#include "schema.h" - -TEST(commonSchemaTests, createSchema) { - EXPECT_EQ(1, 2/2); -} \ No newline at end of file diff --git a/src/vnode/tests/tsdb/tsdbTests.cpp b/src/vnode/tests/tsdb/tsdbTests.cpp deleted file mode 100644 index 3e7a7cca5f4cc6ce5f443416c6b963bff764a5b9..0000000000000000000000000000000000000000 --- a/src/vnode/tests/tsdb/tsdbTests.cpp +++ /dev/null @@ -1,23 +0,0 @@ -#include -#include - -#include "tsdb.h" - -TEST(TsdbTest, createTsdbRepo) { - STsdbCfg config; - - config.precision = TSDB_PRECISION_MILLI; - config.tsdbId = 0; - config.maxTables = 100; - config.daysPerFile = 10; - config.keep = 3650; - config.minRowsPerFileBlock = 100; - config.maxRowsPerFileBlock = 4096; - config.maxCacheSize = 4 * 1024 * 1024; - - tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", &config, NULL); - - ASSERT_NE(pRepo, nullptr); - - tsdbCloseRepo(pRepo); -} \ No newline at end of file diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index 4e66b98528b10685163fcf99880fe6c025709b10..9736f2cb88dad4aa2ba502d602ea5da4821eed75 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -5,6 +5,9 @@ target_link_libraries(tsdb common tutil) target_include_directories(tsdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PUBLIC "${CMAKE_SOURCE_DIR}/src/inc" PUBLIC "${CMAKE_SOURCE_DIR}/src/util/inc" PUBLIC "${CMAKE_SOURCE_DIR}/src/os/linux/inc" - ) \ No newline at end of file + ) + +add_subdirectory(tests) \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 655448ed16f0685f7209f8c69a4498ad227930d4..c8d5f600d96ed55597afffa6ad43ca6f5a4bf8dd 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -46,8 +46,9 @@ typedef struct { // Submit message for one table typedef struct { STableId tableId; + int32_t padding; // TODO just for padding here int32_t sversion; // data schema version - int32_t numOfRows; // number of rows data + int32_t len; // message length char data[]; } SSubmitBlock; @@ -98,6 +99,17 @@ typedef struct { int64_t tableTotalDiskSize; // In bytes } STableInfo; +/** + * Create a configuration for TSDB default + * @return a pointer to a configuration. the configuration must call tsdbFreeCfg to free memory after usage + */ +STsdbCfg *tsdbCreateDefaultCfg(); + +/** + * Free + */ +void tsdbFreeCfg(STsdbCfg *pCfg); + /** * Create a new TSDB repository * @param rootDir the TSDB repository root directory @@ -171,7 +183,7 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); * * @return 0 for success, -1 for failure and the error number is set */ -int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid); +int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); /** * Get the information of a table in the repository @@ -187,13 +199,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid); /** * Insert data to a table in a repository * @param pRepo the TSDB repository handle - * @param tid the table ID to insert to * @param pData the data to insert (will give a more specific description) - * @param error the error number to set when failure occurs * * @return the number of points inserted, -1 for failure and the error number is set */ -int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData); +int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg); // -- FOR QUERY TIME SERIES DATA diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 31d8221723f377fc0fec456bdd65164e035ba067..8a78a6b19e4ff48945a90273b371717df8d285d5 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -44,7 +44,7 @@ typedef struct STSDBCache { int32_t numOfBlocks; STSDBCacheBlock *cacheList; void * current; -} SCacheHandle; +} STsdbCache; // ---- Operation on STSDBCacheBlock #define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) @@ -53,8 +53,9 @@ typedef struct STSDBCache { #define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next) #define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev) -SCacheHandle *tsdbCreateCache(int32_t numOfBlocks); -int32_t tsdbFreeCache(SCacheHandle *pHandle); +STsdbCache *tsdbCreateCache(int32_t numOfBlocks); +int32_t tsdbFreeCache(STsdbCache *pCache); +void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index 99930e94540aee37bd49e6b6b3cddc43df66863b..12edfece8c467dbf0ecd9bcacf08bbec99ac043e 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -17,7 +17,9 @@ #include +#include "tsdb.h" #include "dataformat.h" +#include "tskiplist.h" #ifdef __cplusplus extern "C" { @@ -34,6 +36,8 @@ typedef enum { TSDB_STABLE // table created from super table } TSDB_TABLE_TYPE; +#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL) + typedef struct STable { STableId tableId; TSDB_TABLE_TYPE type; @@ -76,8 +80,9 @@ typedef struct STable { typedef struct { int32_t maxTables; + int32_t nTables; STable **tables; // array of normal tables - STable * stables; // linked list of super tables + STable * stables; // linked list of super tables // TODO use container to implement this void * tableMap; // hash map of uid ==> STable * } STsdbMeta; @@ -105,11 +110,13 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables); int32_t tsdbFreeMeta(STsdbMeta *pMeta); // Recover the meta handle from the file -STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir); - -int32_t tsdbCreateTableImpl(STsdbMeta *pHandle, STableCfg *pCfg); +STsdbMeta *tsdbOpenMeta(char *tsdbDir); -int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData); +int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); +int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); +STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); +int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); +STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 25b649b73de63b9ec7c33b656ee431f44720a3d6..dacb36025370a27267bdc72557408c3f8db93974 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -1,17 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ #include #include "tsdbCache.h" +STsdbCache *tsdbCreateCache(int32_t numOfBlocks) { + STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache)); + if (pCacheHandle == NULL) { + // TODO : deal with the error + return NULL; + } -SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) { - SCacheHandle *pCacheHandle = (SCacheHandle *)malloc(sizeof(SCacheHandle)); - if (pCacheHandle == NULL) { - // TODO : deal with the error - return NULL; - } + return pCacheHandle; +} - return pCacheHandle; +int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } -} +void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { + // TODO: implement here + void *ptr = malloc(bytes); + if (ptr == NULL) return NULL; -int32_t tsdbFreeCache(SCacheHandle *pHandle) { return 0; } \ No newline at end of file + return ptr; +} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c new file mode 100644 index 0000000000000000000000000000000000000000..6009d160e3b4ae80aed19024baf6dc2639929c57 --- /dev/null +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include + +#include "tsdbFile.h" + +const char *tsdbFileSuffix[] = { + ".head", // TSDB_FILE_TYPE_HEAD + ".data", // TSDB_FILE_TYPE_DATA + ".last", // TSDB_FILE_TYPE_LAST + ".meta" // TSDB_FILE_TYPE_META +}; + +char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { + if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL; + + char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5); + if (fileName == NULL) return NULL; + + sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]); + return fileName; +} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbFileStore.c b/src/vnode/tsdb/src/tsdbFileStore.c deleted file mode 100644 index 5c0702e29626572aa2a426006af0bc227d0b8479..0000000000000000000000000000000000000000 --- a/src/vnode/tsdb/src/tsdbFileStore.c +++ /dev/null @@ -1,22 +0,0 @@ -#include -#include -#include - -#include "tsdbFile.h" - -const char *tsdbFileSuffix[] = { - ".head", // TSDB_FILE_TYPE_HEAD - ".data", // TSDB_FILE_TYPE_DATA - ".last", // TSDB_FILE_TYPE_LAST - ".meta" // TSDB_FILE_TYPE_META -}; - -char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { - if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL; - - char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5); - if (fileName == NULL) return NULL; - - sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]); - return fileName; -} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index cf217f07711e577c5c2af46efdeddee07d58c361..155ad192064d74d329bb5605788a7fc81e71ffcc 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -1,13 +1,14 @@ -#include +#include #include +#include #include #include +#include #include +#include #include #include #include -#include -#include // #include "taosdef.h" // #include "disk.h" @@ -15,6 +16,31 @@ #include "tsdbCache.h" #include "tsdbFile.h" #include "tsdbMeta.h" +#include "tutil.h" +#include "tskiplist.h" + +#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision +#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) +#define TSDB_MIN_ID 0 +#define TSDB_MAX_ID INT_MAX +#define TSDB_MIN_TABLES 10 +#define TSDB_MAX_TABLES 100000 +#define TSDB_DEFAULT_TABLES 1000 +#define TSDB_DEFAULT_DAYS_PER_FILE 10 +#define TSDB_MIN_DAYS_PER_FILE 1 +#define TSDB_MAX_DAYS_PER_FILE 60 +#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100 +#define TSDB_MIN_MIN_ROW_FBLOCK 10 +#define TSDB_MAX_MIN_ROW_FBLOCK 1000 +#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096 +#define TSDB_MIN_MAX_ROW_FBLOCK 200 +#define TSDB_MAX_MAX_ROW_FBLOCK 10000 +#define TSDB_DEFAULT_KEEP 3650 +#define TSDB_MIN_KEEP 1 +#define TSDB_MAX_KEEP INT_MAX +#define TSDB_DEFAULT_CACHE_SIZE (16 * 1024 * 1024) // 16M +#define TSDB_MIN_CACHE_SIZE (4 * 1024 * 1024) // 4M +#define TSDB_MAX_CACHE_SIZE (1024 * 1024 * 1024) // 1G enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; @@ -27,7 +53,7 @@ typedef struct _tsdb_repo { STsdbMeta *tsdbMeta; // The cache Handle - SCacheHandle *tsdbCache; + STsdbCache *tsdbCache; // Disk tier handle for multi-tier storage void *diskTier; @@ -49,17 +75,37 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE) #define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED) -tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) { +STsdbCfg *tsdbCreateDefaultCfg() { + STsdbCfg *pCfg = (STsdbCfg *)malloc(sizeof(STsdbCfg)); + if (pCfg == NULL) return NULL; + + pCfg->precision = -1; + pCfg->tsdbId = 0; + pCfg->maxTables = -1; + pCfg->daysPerFile = -1; + pCfg->minRowsPerFileBlock = -1; + pCfg->maxRowsPerFileBlock = -1; + pCfg->keep = -1; + pCfg->maxCacheSize = -1; + return pCfg; +} + +void tsdbFreeCfg(STsdbCfg *pCfg) { + if (pCfg != NULL) free(pCfg); +} + +tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) { if (rootDir == NULL) return NULL; - if (access(rootDir, F_OK|R_OK|W_OK) == -1) return NULL; + if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) { return NULL; @@ -200,33 +246,98 @@ int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) { return 0; } -int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid) { - return 0; +int32_t tsdbDropTable(tsdb_repo_t *repo, STableId tableId) { + // TODO + if (repo == NULL) return -1; + STsdbRepo *pRepo = (STsdbRepo *)repo; + + return tsdbDropTableImpl(pRepo->tsdbMeta, tableId); } -STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid) { +STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) { // TODO return NULL; } -int32_t tsdbInsertData(tsdb_repo_t *repo, STableId tableId, char *pData) { - STsdbRepo *pRepo = (STsdbRepo *)repo; +// TODO: need to return the number of data inserted +int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { + STsdbRepo * pRepo = (STsdbRepo *)repo; + SSubmitBlock *pBlock = pMsg->data; - tsdbInsertDataImpl(pRepo->tsdbMeta, tableId, pData); + for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message + if (tsdbInsertDataToTable(repo, pBlock) < 0) { + return -1; + } + pBlock = ((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len; + } return 0; } // Check the configuration and set default options static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { - // TODO + // Check precision + if (pCfg->precision == -1) { + pCfg->precision = TSDB_DEFAULT_PRECISION; + } else { + if (!IS_VALID_PRECISION(pCfg->precision)) return -1; + } + + // Check tsdbId + if (pCfg->tsdbId < 0) return -1; + + // Check MaxTables + if (pCfg->maxTables == -1) { + pCfg->maxTables = TSDB_DEFAULT_TABLES; + } else { + if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1; + } + + // Check daysPerFile + if (pCfg->daysPerFile == -1) { + pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; + } else { + if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1; + } + + // Check minRowsPerFileBlock and maxRowsPerFileBlock + if (pCfg->minRowsPerFileBlock == -1) { + pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK; + } else { + if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) + return -1; + } + + if (pCfg->maxRowsPerFileBlock == -1) { + pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; + } else { + if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) + return -1; + } + + if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1; + + // Check keep + if (pCfg->keep == -1) { + pCfg->keep = TSDB_DEFAULT_KEEP; + } else { + if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1; + } + + // Check maxCacheSize + if (pCfg->maxCacheSize == -1) { + pCfg->maxCacheSize = TSDB_DEFAULT_CACHE_SIZE; + } else { + if (pCfg->maxCacheSize < TSDB_MIN_CACHE_SIZE || pCfg->maxCacheSize > TSDB_MAX_CACHE_SIZE) return -1; + } + return 0; } static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) { char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META); - int fd = open(metaFname, O_WRONLY|O_CREAT); + int fd = open(metaFname, O_WRONLY | O_CREAT); if (fd < 0) { return -1; } @@ -290,5 +401,50 @@ static int tsdbOpenMetaFile(char *tsdbDir) { static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) { // TODO: read tsdb configuration from file // recover tsdb meta + return 0; +} + +static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { + // TODO + int32_t level = 0; + int32_t headSize = 0; + + tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize); + + // Copy row into the memory + SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row)); + if (pNode == NULL) { + // TODO: deal with allocate failure + } + + pNode->level = level; + tdDataRowCpy(SL_GET_NODE_DATA(pNode), row); + + // Insert the skiplist node into the data + tsdbInsertRowToTableImpl(pNode, pTable); + + return 0; +} + +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + + STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); + if (pTable == NULL) { + return -1; + } + + SDataRows rows = pBlock->data; + SDataRowsIter rDataIter, *pIter; + pIter = &rDataIter; + SDataRow row; + + tdInitSDataRowsIter(rows, pIter); + while ((row = tdDataRowsNext(pIter)) != NULL) { + if (tdInsertRowToTable(pRepo, row, pTable) < 0) { + // TODO: deal with the error here + } + } + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 76801b1ddce9cfb9b1eb16d1710765711fd124d2..6c9cc2404aafb57d13d1e103c5940de600ce7efb 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -1,33 +1,30 @@ #include // #include "taosdef.h" -#include "hash.h" #include "tskiplist.h" #include "tsdb.h" +#include "taosdef.h" #include "tsdbMeta.h" +#include "hash.h" +#include "tsdbCache.h" -#define TSDB_MIN_TABLES 10 -#define TSDB_MAX_TABLES 100000 -#define TSDB_DEFAULT_NSTABLES 10 - -#define IS_VALID_MAX_TABLES(maxTables) (((maxTables) >= TSDB_MIN_TABLES) && ((maxTables) <= TSDB_MAX_TABLES)) +#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here static int tsdbFreeTable(STable *pTable); static int32_t tsdbCheckTableCfg(STableCfg *pCfg); -static STable *tsdbGetTableByUid(int64_t uid); -static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable); +static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); +static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); STsdbMeta *tsdbCreateMeta(int32_t maxTables) { - if (!IS_VALID_MAX_TABLES(maxTables)) return NULL; - STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); if (pMeta == NULL) { return NULL; } pMeta->maxTables = maxTables; + pMeta->nTables = 0; pMeta->stables = NULL; pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); if (pMeta->tables == NULL) { @@ -76,52 +73,60 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { } STable *pSTable = NULL; + int newSuper = 0; - if (pCfg->stableUid > 0) { // to create a TSDB_STABLE - pSTable = tsdbGetTableByUid(pCfg->stableUid); + if (IS_CREATE_STABLE(pCfg)) { // to create a TSDB_STABLE, check if super table exists + pSTable = tsdbGetTableByUid(pMeta, pCfg->stableUid); if (pSTable == NULL) { // super table not exists, try to create it + newSuper = 1; pSTable = (STable *)calloc(1, sizeof(STable)); if (pSTable == NULL) return -1; pSTable->tableId.uid = pCfg->stableUid; pSTable->tableId.tid = -1; pSTable->type = TSDB_SUPER_TABLE; - pSTable->createdTime = pCfg->createdTime; // The created time is not required + // pSTable->createdTime = pCfg->createdTime; // The created time is not required pSTable->stableUid = -1; pSTable->numOfCols = pCfg->numOfCols; pSTable->pSchema = tdDupSchema(pCfg->schema); - // pSTable->content.pIndex = tSkipListCreate(5, 0, 10); // TODO: change here - tsdbAddTable(pMeta, pSTable); + pSTable->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, + 0, NULL); // Allow duplicate key, no lock + if (pSTable->content.pIndex == NULL) { + free(pSTable); + return -1; + } } else { - if (pSTable->type != TSDB_SUPER_TABLE) return NULL; + if (pSTable->type != TSDB_SUPER_TABLE) return -1; } } STable *pTable = (STable *)malloc(sizeof(STable)); if (pTable == NULL) { + if (newSuper) tsdbFreeTable(pSTable); return -1; } pTable->tableId = pCfg->tableId; pTable->createdTime = pCfg->createdTime; - if (1 /* */) { // TSDB_STABLE + if (IS_CREATE_STABLE(pCfg)) { // TSDB_STABLE pTable->type = TSDB_STABLE; pTable->stableUid = pCfg->stableUid; - pTable->pTagVal = tdSDataRowDup(pCfg->tagValues); + pTable->pTagVal = tdDataRowDup(pCfg->tagValues); } else { // TSDB_NTABLE pTable->type = TSDB_NTABLE; pTable->stableUid = -1; pTable->pSchema = tdDupSchema(pCfg->schema); } - // pTable->content.pData = tSkipListCreate(5, 0, 10); // TODO: change here + pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL); - tsdbAddTable(pMeta, pTable); + if (newSuper) tsdbAddTableToMeta(pMeta, pSTable); + tsdbAddTableToMeta(pMeta, pTable); return 0; } -STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) { - // Open meta file for reading +STsdbMeta *tsdbOpenMeta(char *tsdbDir) { + // TODO : Open meta file for reading STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); if (pMeta == NULL) { @@ -131,29 +136,85 @@ STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) { return pMeta; } -int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData) { - STable *pTable = pMeta->tables[tableId.tid]; +/** + * Check if a table is valid to insert. + * @return NULL for invalid and the pointer to the table if valid + */ +STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { + STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); if (pTable == NULL) { - // TODO: deal with the error here - return 0; + return NULL; + } + + if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return NULL; + if (pTable->tableId.tid != tableId.tid) return NULL; + + return pTable; +} + +int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { + if (pMeta == NULL) return -1; + + STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); + if (pTable == NULL) return -1; + + if (pTable->type == TSDB_SUPER_TABLE) { + // TODO: implement drop super table + return -1; + } else { + pMeta->tables[pTable->tableId.tid] = NULL; + pMeta->nTables--; + assert(pMeta->nTables >= 0); + if (pTable->type == TSDB_STABLE) { + tsdbRemoveTableFromIndex(pMeta, pTable); + } + + tsdbFreeTable(pTable); + } + + return 0; +} + +int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { + tSkipListPut(pTable->content.pData, pNode); + return 0; +} + +static int tsdbFreeTable(STable *pTable) { + // TODO: finish this function + if (pTable->type == TSDB_STABLE) { + tdFreeDataRow(pTable->pTagVal); + } else { + tdFreeSchema(pTable->pSchema); } - if (pTable->tableId.uid != tableId.uid) { - // TODO: deal with the error here - return 0; + // Free content + if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { + tSkipListDestroy(pTable->content.pIndex); + } else { + tSkipListDestroy(pTable->content.pData); } + free(pTable); return 0; } -static int tsdbFreeTable(STable *pTable) { return 0; } +static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { + // TODO + return 0; +} -static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { return 0; } +STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) { + void *ptr = taosGetDataFromHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid)); -static STable *tsdbGetTableByUid(int64_t uid) { return NULL; } + if (ptr == NULL) return NULL; -static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) { - if (pTable->type == TSDB_SUPER_TABLE) { + return *(STable **)ptr; +} + +static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) { + if (pTable->type == TSDB_SUPER_TABLE) { + // add super table to the linked list if (pMeta->stables == NULL) { pMeta->stables = pTable; pTable->next = NULL; @@ -163,20 +224,39 @@ static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) { pTable->next = pTemp; } } else { + // add non-super table to the array pMeta->tables[pTable->tableId.tid] = pTable; if (pTable->type == TSDB_STABLE) { + // add STABLE to the index tsdbAddTableIntoIndex(pMeta, pTable); } + pMeta->nTables++; } return tsdbAddTableIntoMap(pMeta, pTable); } +static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { + // TODO + return 0; +} + static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { // TODO: add the table to the map + int64_t uid = pTable->tableId.uid; + if (taosAddToHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) { + return -1; + } return 0; } static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { + assert(pTable->type == TSDB_STABLE); + // TODO + return 0; +} + +static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { + assert(pTable->type == TSDB_STABLE); // TODO return 0; } \ No newline at end of file diff --git a/src/vnode/tests/tsdb/CMakeLists.txt b/src/vnode/tsdb/tests/CMakeLists.txt similarity index 65% rename from src/vnode/tests/tsdb/CMakeLists.txt rename to src/vnode/tsdb/tests/CMakeLists.txt index a942dd917e7cb52c9c41162dffe7206d7f16e69c..51c15bce203eb920d7f377d261322047ccd008dc 100644 --- a/src/vnode/tests/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/tests/CMakeLists.txt @@ -1,9 +1,7 @@ aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) -message(STATUS "TSDB: ${SOURCE_LIST}") - add_executable(tsdbTests ${SOURCE_LIST}) -target_link_libraries(tsdbTests gtest gtest_main pthread tsdb) +target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb) add_test( NAME diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..534b75bfb664bd5a3d3ab739f94d538a72ae6bf5 --- /dev/null +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -0,0 +1,93 @@ +#include +#include + +#include "tsdb.h" +#include "tsdbMeta.h" + +TEST(TsdbTest, DISABLED_createTable) { + STsdbMeta *pMeta = tsdbCreateMeta(100); + ASSERT_NE(pMeta, nullptr); + + STableCfg config; + config.tableId.tid = 0; + config.tableId.uid = 98868728187539L; + config.numOfCols = 5; + config.schema = tdNewSchema(config.numOfCols); + for (int i = 0; i < schemaNCols(config.schema); i++) { + SColumn *pCol = tdNewCol(TD_DATATYPE_BIGINT, i, 0); + tdColCpy(schemaColAt(config.schema, i), pCol); + tdFreeCol(pCol); + } + config.tagValues = nullptr; + + tsdbCreateTableImpl(pMeta, &config); + + STable *pTable = tsdbGetTableByUid(pMeta, config.tableId.uid); + ASSERT_NE(pTable, nullptr); +} + +TEST(TsdbTest, createRepo) { + STsdbCfg *pCfg = tsdbCreateDefaultCfg(); + + // Create a tsdb repository + tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", pCfg, NULL); + ASSERT_NE(pRepo, nullptr); + tsdbFreeCfg(pCfg); + + // create a normal table in this repository + STableCfg config; + config.tableId.tid = 0; + config.tableId.uid = 98868728187539L; + config.numOfCols = 5; + config.schema = tdNewSchema(config.numOfCols); + SColumn *pCol = tdNewCol(TD_DATATYPE_TIMESTAMP, 0, 0); + tdColCpy(schemaColAt(config.schema, 0), pCol); + tdFreeCol(pCol); + for (int i = 1; i < schemaNCols(config.schema); i++) { + pCol = tdNewCol(TD_DATATYPE_BIGINT, i, 0); + tdColCpy(schemaColAt(config.schema, i), pCol); + tdFreeCol(pCol); + } + + tsdbCreateTable(pRepo, &config); + // Write some data + + int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t); + + tdUpdateSchema(config.schema); + + SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size); + pMsg->numOfTables = 1; // TODO: use api + + SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data; + pBlock->tableId = {.uid = 98868728187539L, .tid = 0}; + pBlock->sversion = 0; + pBlock->len = sizeof(SSubmitBlock); + + SDataRows rows = pBlock->data; + dataRowsInit(rows); + + SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema)); + int64_t ttime = 1583508800000; + for (int i = 0; i < 10; i++) { // loop over rows + ttime += (10000 * i); + tdDataRowReset(row); + for (int j = 0; j < schemaNCols(config.schema); j++) { + if (j == 0) { // set time stamp + tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40); + } else { // set other fields + int32_t val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40); + } + } + + tdDataRowsAppendRow(rows, row); + } + + tsdbInsertData(pRepo, pMsg); + + tdFreeDataRow(row); + + tdFreeSchema(config.schema); + tsdbDropRepo(pRepo); +} \ No newline at end of file diff --git a/src/vnode/wal/inc/vnodewal.h b/src/vnode/wal/inc/vnodewal.h index c6157b47a5e63c239a6d3805b884e38dbabb3b39..7753e4ecca5401bfc8434465f90cdb415d5757a7 100644 --- a/src/vnode/wal/inc/vnodewal.h +++ b/src/vnode/wal/inc/vnodewal.h @@ -1,6 +1,35 @@ -#if !defined(_TD_WAL_H_) +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_WAL_H_ #define _TD_WAL_H_ +#include +#ifdef __cplusplus +extern "C" { +#endif +typedef void walh; // WAL HANDLE -#endif // _TD_WAL_H_ +walh *vnodeOpenWal(int vnode, uint8_t op); +int vnodeCloseWal(walh *pWal); +int vnodeRenewWal(walh *pWal); +int vnodeWriteWal(walh *pWal, void *cont, int contLen); +int vnodeSyncWal(walh *pWal); + +#ifdef __cplusplus +} +#endif + +#endif // _TD_WAL_H_