提交 def4c131 编写于 作者: S slguan

Merge branch '2.0' into refact/slguan

......@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(os)
ADD_SUBDIRECTORY(util)
ADD_SUBDIRECTORY(rpc)
ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(plugins)
ADD_SUBDIRECTORY(sdb)
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(vnode)
# ADD_SUBDIRECTORY(client)
# ADD_SUBDIRECTORY(kit)
# ADD_SUBDIRECTORY(plugins)
# ADD_SUBDIRECTORY(sdb)
# ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(vnode)
# ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(connector/jdbc)
......@@ -10,7 +10,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http)
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http tsdb)
#IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(taosd dcluster)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAOSARRAY_H
#define TDENGINE_TAOSARRAY_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((array)->pData + (index) * (array)->elemSize)
typedef struct SArray {
size_t size;
size_t capacity;
size_t elemSize;
void* pData;
} SArray;
/**
*
* @param size
* @param elemSize
* @return
*/
void* taosArrayInit(size_t size, size_t elemSize);
/**
*
* @param pArray
* @param pData
* @return
*/
void* taosArrayPush(SArray* pArray, void* pData);
/**
*
* @param pArray
*/
void taosArrayPop(SArray* pArray);
/**
*
* @param pArray
* @param index
* @return
*/
void* taosArrayGet(SArray* pArray, size_t index);
/**
*
* @param pArray
* @return
*/
size_t taosArrayGetSize(SArray* pArray);
/**
*
* @param pArray
* @param index
* @param pData
*/
void taosArrayInsert(SArray* pArray, int32_t index, void* pData);
/**
*
* @param pArray
*/
void taosArrayDestory(SArray* pArray);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TAOSARRAY_H
......@@ -20,59 +20,62 @@
extern "C" {
#endif
#define MAX_SKIP_LIST_LEVEL 20
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include "os.h"
#include "ttypes.h"
#include "tarray.h"
/*
* key of each node
* todo move to as the global structure in all search codes...
*/
#define MAX_SKIP_LIST_LEVEL 15
#define SKIP_LIST_RECORD_PERFORMANCE 0
typedef char *SSkipListKey;
typedef char *(*__sl_key_fn_t)(const void *);
/**
* the format of skip list node is as follows:
* +------------+-----------------------+------------------------+-----+------+
* | node level | forward pointer array | backward pointer array | key | data |
* +------------+-----------------------+------------------------+-----+------+
* the skiplist node is located in a consecutive memory area, key will not be copy to skip list
*/
typedef struct SSkipListNode {
uint8_t level;
} SSkipListNode;
const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15;
typedef tVariant tSkipListKey;
typedef enum tSkipListPointQueryType {
INCLUDE_POINT_QUERY,
EXCLUDE_POINT_QUERY,
} tSkipListPointQueryType;
#define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES)
typedef struct tSkipListNode {
uint16_t nLevel;
char * pData;
#define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)]
#define SL_GET_BACKWARD_POINTER(n, _l) \
((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)]
struct tSkipListNode **pForward;
struct tSkipListNode **pBackward;
#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
tSkipListKey key;
} tSkipListNode;
#define SL_GET_NODE_LEVEL(n) *(int32_t *)((n))
/*
* @version 0.2
* @version 0.3
* @date 2017/11/12
* the simple version of SkipList.
* the simple version of skip list.
*
* for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate
* deterministic result. Later, we will remove the lock in SkipList to further
* enhance the performance. In this case, one should use the concurrent skip list (by
* using michael-scott algorithm) instead of this simple version in a multi-thread
* environment, to achieve higher performance of read/write operations.
* deterministic result. Later, we will remove the lock in SkipList to further enhance the performance.
* In this case, one should use the concurrent skip list (by using michael-scott algorithm) instead of
* this simple version in a multi-thread environment, to achieve higher performance of read/write operations.
*
* Note: Duplicated primary key situation.
* In case of duplicated primary key, two ways can be employed to handle this situation:
* 1. add as normal insertion with out special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
* in the overflow pointer. In this case, the total steps of each search will be reduced significantly.
* 1. add as normal insertion without special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added in the overflow pointer.
* In this case, the total steps of each search will be reduced significantly.
* Currently, we implement the skip list in a line with the first means, maybe refactor it soon.
*
* Memory consumption: the memory alignment causes many memory wasted. So, employ a memory
* pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs.
*
* 3. use the iterator pattern to refactor all routines to make it more clean
*/
// state struct, record following information:
......@@ -81,7 +84,7 @@ typedef struct tSkipListNode {
// avg search rsp time, for latest 1000 queries
// total memory size
typedef struct tSkipListState {
// in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize
// in bytes, sizeof(SSkipList)+sizeof(SSkipListNode)*SSkipList->nSize
uint64_t nTotalMemSize;
uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL];
uint64_t queryCount; // total query count
......@@ -101,68 +104,95 @@ typedef struct tSkipListState {
uint64_t nTotalElapsedTimeForInsert;
} tSkipListState;
typedef struct tSkipList {
tSkipListNode pHead;
uint64_t nSize;
uint16_t nMaxLevel;
uint16_t nLevel;
uint16_t keyType;
uint16_t nMaxKeyLen;
typedef struct SSkipListKeyInfo {
uint8_t dupKey : 2; // if allow duplicated key in the skip list
uint8_t type : 6; // key type
uint8_t len; // maximum key length, used in case of string key
} SSkipListKeyInfo;
typedef struct SSkipList {
__compar_fn_t comparFn;
__sl_key_fn_t keyFn;
uint32_t size;
uint8_t maxLevel;
uint8_t level;
SSkipListKeyInfo keyInfo;
__compar_fn_t comparator;
pthread_rwlock_t lock; // will be removed soon
pthread_rwlock_t *lock;
SSkipListNode * pHead;
#if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState state; // skiplist state
} tSkipList;
#endif
} SSkipList;
/*
* iterate the skiplist
* this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may
* continue iterating the skiplist, so add the reference count for skiplist
* TODO add the ref for skiplist when one iterator is created
* TODO add the ref for skip list when one iterator is created
*/
typedef struct SSkipListIterator {
tSkipList * pSkipList;
tSkipListNode *cur;
SSkipList * pSkipList;
SSkipListNode *cur;
int64_t num;
} SSkipListIterator;
/*
* query condition structure to denote the range query
* todo merge the point query cond with range query condition
/**
*
* @param nMaxLevel maximum skip list level
* @param keyType type of key
* @param dupKey allow the duplicated key in the skip list
* @return
*/
typedef struct tSKipListQueryCond {
// when the upper bounding == lower bounding, it is a point query
tSkipListKey lowerBnd;
tSkipListKey upperBnd;
int32_t lowerBndRelOptr; // relation operator to denote if lower bound is
int32_t upperBndRelOptr; // included or not
} tSKipListQueryCond;
tSkipList *tSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen);
SSkipList *tSkipListCreate(uint8_t nMaxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t threadsafe,
__sl_key_fn_t fn);
void *tSkipListDestroy(tSkipList *pSkipList);
// create skip list key
tSkipListKey tSkipListCreateKey(int32_t type, char *val, size_t keyLength);
// destroy skip list key
void tSkipListDestroyKey(tSkipListKey *pKey);
/**
*
* @param pSkipList
* @return NULL will always be returned
*/
void *tSkipListDestroy(SSkipList *pSkipList);
// put data into skiplist
tSkipListNode *tSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey);
/**
*
* @param pSkipList
* @param level
* @param headSize
*/
void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize);
/*
* get only *one* node of which key is equalled to pKey, even there are more
* than one nodes are of the same key
/**
* put the skip list node into the skip list.
* If failed, NULL will be returned, otherwise, the pNode will be returned.
*
* @param pSkipList
* @param pNode
* @return
*/
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey);
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
/*
* get all data with the same keys
/**
* get only *one* node of which key is equalled to pKey, even there are more than one nodes are of the same key
*
* @param pSkipList
* @param pKey
* @param keyType
* @return
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes);
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType);
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
/**
*
* @param pSkipList
* @param pRes
* @param fp
* @param param
* @return
*/
int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
void *param);
/*
......@@ -173,30 +203,16 @@ int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (
* true: one node has been removed
* false: no node has been removed
*/
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey);
bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
/*
* remove the specified node in parameters
*/
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode);
// for debug purpose only
void tSkipListPrint(tSkipList *pSkipList, int16_t nlevel);
/*
* range query & single point query function
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult);
/*
* include/exclude point query
*/
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pResult);
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode);
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter);
int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter);
bool tSkipListIteratorNext(SSkipListIterator *iter);
tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tarray.h"
void* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0);
if (size < TARRAY_MIN_SIZE) {
size = TARRAY_MIN_SIZE;
}
SArray* pArray = calloc(1, sizeof(SArray));
if (pArray == NULL) {
return NULL;
}
pArray->pData = calloc(size, elemSize * size);
if (pArray->pData == NULL) {
free(pArray);
return NULL;
}
pArray->capacity = size;
pArray->elemSize = elemSize;
return pArray;
}
static void taosArrayResize(SArray* pArray) {
assert(pArray->size >= pArray->capacity);
size_t size = pArray->capacity;
size = (size << 1u);
void* tmp = realloc(pArray->pData, size * pArray->elemSize);
if (tmp == NULL) {
// todo
}
pArray->pData = tmp;
pArray->capacity = size;
}
void* taosArrayPush(SArray* pArray, void* pData) {
if (pArray == NULL || pData == NULL) {
return NULL;
}
if (pArray->size >= pArray->capacity) {
taosArrayResize(pArray);
}
void* dst = TARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize);
pArray->size += 1;
return dst;
}
void taosArrayPop(SArray* pArray) {
if (pArray == NULL || pArray->size == 0) {
return;
}
pArray->size -= 1;
}
void* taosArrayGet(SArray* pArray, size_t index) {
assert(index < pArray->size);
return TARRAY_GET_ELEM(pArray, index);
}
size_t taosArrayGetSize(SArray* pArray) { return pArray->size; }
void taosArrayInsert(SArray* pArray, int32_t index, void* pData) {
if (pArray == NULL || pData == NULL) {
return;
}
if (index >= pArray->size) {
taosArrayPush(pArray, pData);
return;
}
if (pArray->size >= pArray->capacity) {
taosArrayResize(pArray);
}
void* dst = TARRAY_GET_ELEM(pArray, index);
int32_t remain = pArray->size - index;
memmove(dst + pArray->elemSize, dst, pArray->elemSize * remain);
memcpy(dst, pData, pArray->elemSize);
pArray->size += 1;
}
void taosArrayDestory(SArray* pArray) {
if (pArray == NULL) {
return;
}
free(pArray->pData);
free(pArray);
}
......@@ -15,49 +15,54 @@
#include "os.h"
#include "tlog.h"
#include "taosdef.h"
// #include "tsdb.h"
#include "tskiplist.h"
#include "tutil.h"
static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level
for (int32_t i = 0; i < nLevel; ++i) {
static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]++;
}
#endif
}
static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) {
for (int32_t i = 0; i < nLevel; ++i) {
static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) {
#if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
}
#endif
}
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) {
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) {
const uint32_t factor = 4;
int32_t n = 1;
while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) {
while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
n++;
}
return n;
}
static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) {
int32_t nLevel = getSkipListNodeRandomHeight(pSkipList);
if (pSkipList->nSize == 0) {
nLevel = 1;
pSkipList->nLevel = 1;
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) {
int32_t level = getSkipListNodeRandomHeight(pSkipList);
if (pSkipList->size == 0) {
level = 1;
pSkipList->level = 1;
} else {
if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) {
nLevel = (++pSkipList->nLevel);
if (level > pSkipList->level && pSkipList->level < pSkipList->maxLevel) {
level = (++pSkipList->level);
}
}
return nLevel;
return level;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode);
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode);
void tSkipListDoRecordPut(tSkipList *pSkipList) {
void tSkipListDoRecordPut(SSkipList *pSkipList) {
#if SKIP_LIST_RECORD_PERFORMANCE
const int32_t MAX_RECORD_NUM = 1000;
if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) {
......@@ -67,61 +72,38 @@ void tSkipListDoRecordPut(tSkipList *pSkipList) {
} else {
pSkipList->state.nInsertObjs++;
}
#endif
}
int32_t compareIntVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
DEFAULT_COMP(lhs, rhs);
}
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
double rhs = ((tSkipListKey *)pRight)->dKey;
if (fabs(lhs - rhs) < FLT_EPSILON) {
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
return ret > 0 ? 1 : -1;
}
}
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
double lhs = ((tSkipListKey *)pLeft)->dKey;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
if (fabs(lhs - rhs) < FLT_EPSILON) {
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
return ret > 0 ? 1 : -1;
}
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey);
if (fabs(ret) < FLT_EPSILON) {
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT16_VAL(pLeft) - GET_INT16_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
int32_t compareStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz);
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT8_VAL(pLeft) - GET_INT8_VAL(pRight);
if (ret == 0) {
return 0;
} else {
......@@ -129,716 +111,692 @@ int32_t compareStrVal(const void *pLeft, const void *pRight) {
}
}
int32_t compareWStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
// int64_t lhs = ((SSkipListKey *)pLeft)->i64Key;
// double rhs = ((SSkipListKey *)pRight)->dKey;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
}
int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz);
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
// double lhs = ((SSkipListKey *)pLeft)->dKey;
// int64_t rhs = ((SSkipListKey *)pRight)->i64Key;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
}
if (ret == 0) {
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double ret = GET_DOUBLE_VAL(pLeft) - GET_DOUBLE_VAL(pRight);
if (fabs(ret) < FLT_EPSILON) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) {
__compar_fn_t comparator = NULL;
int32_t compareStrVal(const void *pLeft, const void *pRight) {
// SSkipListKey *pL = (SSkipListKey *)pLeft;
// SSkipListKey *pR = (SSkipListKey *)pRight;
//
// if (pL->nLen == 0 && pR->nLen == 0) {
// return 0;
// }
//
// // handle only one-side bound compare situation, there is only lower bound or only upper bound
// if (pL->nLen == -1) {
// return 1; // no lower bound, lower bound is minimum, always return -1;
// } else if (pR->nLen == -1) {
// return -1; // no upper bound, upper bound is maximum situation, always return 1;
// }
//
// int32_t ret = strcmp(((SSkipListKey *)pLeft)->pz, ((SSkipListKey *)pRight)->pz);
//
// if (ret == 0) {
// return 0;
// } else {
// return ret > 0 ? 1 : -1;
// }
return 0;
}
switch (pSkipList->keyType) {
int32_t compareWStrVal(const void *pLeft, const void *pRight) {
// SSkipListKey *pL = (SSkipListKey *)pLeft;
// SSkipListKey *pR = (SSkipListKey *)pRight;
//
// if (pL->nLen == 0 && pR->nLen == 0) {
// return 0;
// }
//
// // handle only one-side bound compare situation, there is only lower bound or only upper bound
// if (pL->nLen == -1) {
// return 1; // no lower bound, lower bound is minimum, always return -1;
// } else if (pR->nLen == -1) {
// return -1; // no upper bound, upper bound is maximum situation, always return 1;
// }
//
// int32_t ret = wcscmp(((SSkipListKey *)pLeft)->wpz, ((SSkipListKey *)pRight)->wpz);
//
// if (ret == 0) {
// return 0;
// } else {
// return ret > 0 ? 1 : -1;
// }
return 0;
}
static __compar_fn_t getKeyFilterComparator(SSkipList *pSkipList, int32_t filterDataType) {
__compar_fn_t comparFn = NULL;
switch (pSkipList->keyInfo.type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BIGINT: {
if (filterDataType == TSDB_DATA_TYPE_BIGINT) {
comparFn = compareInt64Val;
break;
}
}
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareIntVal;
comparFn = compareInt32Val;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = compareIntDoubleVal;
comparFn = compareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareDoubleIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = compareDoubleVal;
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareDoubleIntVal;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
comparator = compareStrVal;
comparFn = compareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = compareWStrVal;
comparFn = compareWStrVal;
break;
default:
comparator = compareIntVal;
comparFn = compareInt32Val;
break;
}
return comparator;
return comparFn;
}
static __compar_fn_t getKeyComparator(int32_t keyType) {
__compar_fn_t comparator = NULL;
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
comparFn = compareInt8Val;
break;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = compareInt16Val;
break;
case TSDB_DATA_TYPE_INT:
comparFn = compareInt32Val;
break;
case TSDB_DATA_TYPE_BIGINT:
comparFn = compareInt64Val;
break;
case TSDB_DATA_TYPE_BOOL:
comparator = compareIntVal;
comparFn = compareInt32Val;
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
comparator = compareDoubleVal;
comparFn = compareDoubleVal;
break;
case TSDB_DATA_TYPE_BINARY:
comparator = compareStrVal;
comparFn = compareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = compareWStrVal;
comparFn = compareWStrVal;
break;
default:
comparator = compareIntVal;
comparFn = compareInt32Val;
break;
}
return comparator;
return comparFn;
}
tSkipList* tSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) {
tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList));
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, uint8_t dupKey, uint8_t lock,
__sl_key_fn_t fn) {
SSkipList *pSkipList = (SSkipList *)calloc(1, sizeof(SSkipList));
if (pSkipList == NULL) {
return NULL;
}
pSkipList->keyType = keyType;
if (maxLevel > MAX_SKIP_LIST_LEVEL) {
maxLevel = MAX_SKIP_LIST_LEVEL;
}
pSkipList->keyInfo = (SSkipListKeyInfo){.type = keyType, .len = keyLen, .dupKey = dupKey};
pSkipList->keyFn = fn;
pSkipList->comparator = getKeyComparator(keyType);
pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL);
pSkipList->comparFn = getKeyComparator(keyType);
pSkipList->maxLevel = maxLevel;
pSkipList->level = 1;
pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL;
pSkipList->nLevel = 1;
pSkipList->pHead = (SSkipListNode *)calloc(1, SL_NODE_HEADER_SIZE(maxLevel));
pSkipList->pHead->level = pSkipList->maxLevel;
pSkipList->nMaxKeyLen = nMaxKeyLen;
pSkipList->nMaxLevel = nMaxLevel;
if (lock) {
pSkipList->lock = calloc(1, sizeof(pthread_rwlock_t));
if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) {
tfree(pSkipList->pHead.pForward);
if (pthread_rwlock_init(pSkipList->lock, NULL) != 0) {
tfree(pSkipList->pHead);
tfree(pSkipList);
return NULL;
}
}
srand(time(NULL));
pSkipList->state.nTotalMemSize += sizeof(tSkipList);
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += sizeof(SSkipList);
#endif
return pSkipList;
}
static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) {
int32_t level = pNode->nLevel;
for (int32_t j = level - 1; j >= 0; --j) {
if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
// static void doRemove(SSkipList *pSkipList, SSkipListNode *pNode, SSkipListNode *forward[]) {
// int32_t level = pNode->level;
// for (int32_t j = level - 1; j >= 0; --j) {
// if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
// forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
// }
//
// if (forward[j]->pForward[j] != NULL) {
// forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
// }
// }
//
// pSkipList->state.nTotalMemSize -= (sizeof(SSkipListNode) + POINTER_BYTES * pNode->level * 2);
// removeNodeEachLevel(pSkipList, pNode->level);
//
// tfree(pNode);
// --pSkipList->size;
//}
void *tSkipListDestroy(SSkipList *pSkipList) {
if (pSkipList == NULL) {
return NULL;
}
if (forward[j]->pForward[j] != NULL) {
forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
}
pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2);
removeNodeEachLevel(pSkipList, pNode->nLevel);
tfree(pNode);
--pSkipList->nSize;
}
SSkipListNode *pNode = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0); // pSkipList->pHead.pForward[0];
static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) {
size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1);
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
size += pKey->nLen + 1;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE;
while (pNode) {
SSkipListNode *pTemp = pNode;
pNode = SL_GET_FORWARD_POINTER(pNode, 0);
tfree(pTemp);
}
return size;
}
static tSkipListNode *tSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) {
size_t nodeSize = getOneNodeSize(pKey, nLevel);
tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize);
pNode->pForward = (tSkipListNode **)(&pNode[1]);
pNode->pBackward = (pNode->pForward + nLevel);
pNode->pData = pData;
tfree(pSkipList->pHead);
pNode->key = *pKey;
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
pNode->key.pz = (char *)(pNode->pBackward + nLevel);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
pthread_rwlock_destroy(pSkipList->lock);
strcpy(pNode->key.pz, pKey->pz);
pNode->key.pz[pKey->nLen] = 0;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel);
wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen);
pNode->key.wpz[pKey->nLen] = 0;
tfree(pSkipList->lock);
}
pNode->nLevel = nLevel;
return pNode;
}
tSkipListKey tSkipListCreateKey(int32_t type, char *val, size_t keyLength) {
tSkipListKey k = {0};
tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type);
return k;
tfree(pSkipList->pHead);
tfree(pSkipList);
return NULL;
}
void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); }
void* tSkipListDestroy(tSkipList *pSkipList) {
void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
while (pNode) {
tSkipListNode *pTemp = pNode;
pNode = pNode->pForward[0];
tfree(pTemp);
return;
}
tfree(pSkipList->pHead.pForward);
pthread_rwlock_unlock(&pSkipList->lock);
pthread_rwlock_destroy(&pSkipList->lock);
tfree(pSkipList);
return NULL;
*level = getSkipListRandLevel(pSkipList);
*headSize = SL_NODE_HEADER_SIZE(*level);
}
tSkipListNode *tSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) {
SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock);
}
// record one node is put into skiplist
tSkipListDoRecordPut(pSkipList);
tSkipListNode *px = &pSkipList->pHead;
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) {
px = px->pForward[i];
}
pSkipList->state.nTotalStepsForInsert++;
forward[i] = px;
}
// if the skiplist does not allowed identical key inserted, the new data will be discarded.
if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead &&
(pSkipList->comparator(&forward[0]->key, pKey) == 0)) {
pthread_rwlock_unlock(&pSkipList->lock);
return forward[0];
}
int32_t nLevel = getSkipListNodeLevel(pSkipList);
recordNodeEachLevel(pSkipList, nLevel);
tSkipListNode *pNode = tSkipListCreateNode(pData, pKey, nLevel);
tSkipListDoInsert(pSkipList, forward, nLevel, pNode);
SSkipListNode *px = pSkipList->pHead;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
pSkipList->nSize += 1;
// char tmpstr[512] = {0};
// tVariantToString(&pNode->key, tmpstr);
// pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList,
// tmpstr, pSkipList->nSize);
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel);
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) {
for (int32_t i = 0; i < nLevel; ++i) {
tSkipListNode *x = forward[i];
if (x != NULL) {
pNode->pBackward[i] = x;
if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode;
pNode->pForward[i] = x->pForward[i];
x->pForward[i] = pNode;
} else {
pSkipList->pHead.pForward[i] = pNode;
pNode->pBackward[i] = &(pSkipList->pHead);
}
}
}
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
tSkipListNode *x = &pSkipList->pHead;
pthread_rwlock_rdlock(&pSkipList->lock);
pSkipList->state.queryCount++;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
if (ret == 0) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[i];
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey,
int32_t cond, tSkipListNode ***pRes) {
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *p = pStartNode;
int32_t numOfRes = 0;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType);
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i);
while (p != NULL) {
int32_t ret = filterComparator(&p->key, pEndKey);
if (ret > 0) {
break;
}
char *key = SL_GET_NODE_KEY(pSkipList, p);
char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode);
if (ret < 0) {
numOfRes++;
p = p->pForward[0];
} else if (ret == 0) {
if (cond == TSDB_RELATION_LESS_EQUAL) {
numOfRes++;
p = p->pForward[0];
// if the forward element is less than the specified key, forward one step
if (pSkipList->comparFn(key, newDatakey) < 0) {
px = p;
p = SL_GET_FORWARD_POINTER(px, i);
} else {
break;
}
}
}
(*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes);
for (int32_t i = 0; i < numOfRes; ++i) {
(*pRes)[i] = pStartNode;
pStartNode = pStartNode->pForward[0];
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalStepsForInsert++;
#endif
forward[i] = px;
}
pthread_rwlock_unlock(&pSkipList->lock);
return numOfRes;
}
/*
* maybe return the copy of tSkipListNode would be better
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) {
(*pRes) = NULL;
// if the skip list does not allowed identical key inserted, the new data will be discarded.
if (pSkipList->keyInfo.dupKey == 0 && forward[0] != pSkipList->pHead) {
char *key = SL_GET_NODE_KEY(pSkipList, forward[0]);
char *pNewDataKey = SL_GET_NODE_KEY(pSkipList, pNode);
tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey);
if (pNode == NULL) {
return 0;
if (pSkipList->comparFn(key, pNewDataKey) == 0) {
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
// backward check if previous nodes are with the same value.
tSkipListNode *pPrev = pNode->pBackward[0];
while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) {
pPrev = pPrev->pBackward[0];
return forward[0];
}
}
return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
}
#if SKIP_LIST_RECORD_PERFORMANCE
recordNodeEachLevel(pSkipList, level);
#endif
static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
int32_t level = SL_GET_NODE_LEVEL(pNode);
tSkipListDoInsert(pSkipList, forward, level, pNode);
tSkipListNode *x = &pSkipList->pHead;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
atomic_add_fetch_32(&pSkipList->size, 1);
pthread_rwlock_rdlock(&pSkipList->lock);
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, level);
#endif
if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
// backward check if previous nodes are with the same value.
if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
tSkipListNode *pNode = x->pForward[0];
while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) {
pNode = pNode->pBackward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[0];
} else { // cond == TSDB_RELATION_LARGE && ret == 0
tSkipListNode *pn = x->pForward[0];
while (pn != NULL && filterComparator(&pn->key, pKey) == 0) {
pn = pn->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pn;
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
void *param) {
(*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
if (NULL == *pRes) {
pError("error skiplist %p, malloc failed", pSkipList);
return -1;
}
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
int32_t num = 0;
for (int32_t i = 0; i < pSkipList->nSize; ++i) {
if (pStartNode == NULL) {
pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
#ifdef _DEBUG_VIEW
tSkipListPrint(pSkipList, 1);
#endif
break;
}
void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t level, SSkipListNode *pNode) {
for (int32_t i = 0; i < level; ++i) {
SSkipListNode *x = forward[i];
if (x != NULL) {
SL_GET_BACKWARD_POINTER(pNode, i) = x;
if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
(*pRes)[num++] = pStartNode;
SSkipListNode *pForward = SL_GET_FORWARD_POINTER(x, i);
if (pForward) {
SL_GET_BACKWARD_POINTER(pForward, i) = pNode;
}
pStartNode = pStartNode->pForward[0];
SL_GET_FORWARD_POINTER(pNode, i) = SL_GET_FORWARD_POINTER(x, i);
SL_GET_FORWARD_POINTER(x, i) = pNode;
} else {
SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead);
}
pthread_rwlock_unlock(&pSkipList->lock);
if (num == 0) {
free(*pRes);
*pRes = NULL;
} else if (num < pSkipList->nSize) { // free unused memory
char* tmp = realloc((*pRes), num * POINTER_BYTES);
assert(tmp != NULL);
*pRes = (tSkipListNode**)tmp;
}
return num;
}
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) {
if (pSkipList == NULL) {
return -1;
}
iter->pSkipList = pSkipList;
pthread_rwlock_rdlock(&pSkipList->lock);
iter->cur = NULL;//pSkipList->pHead.pForward[0];
iter->num = pSkipList->nSize;
pthread_rwlock_unlock(&pSkipList->lock);
return 0;
}
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
int32_t sLevel = pSkipList->level - 1;
int32_t ret = -1;
bool tSkipListIteratorNext(SSkipListIterator* iter) {
if (iter->num == 0 || iter->pSkipList == NULL) {
return false;
}
// result list
SArray* sa = taosArrayInit(1, POINTER_BYTES);
tSkipList* pSkipList = iter->pSkipList;
SSkipListNode *pNode = pSkipList->pHead;
pthread_rwlock_rdlock(&pSkipList->lock);
if (iter->cur == NULL) {
iter->cur = pSkipList->pHead.pForward[0];
} else {
iter->cur = iter->cur->pForward[0];
if (pSkipList->lock) {
pthread_rwlock_rdlock(pSkipList->lock);
}
pthread_rwlock_unlock(&pSkipList->lock);
return iter->cur != NULL;
}
tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) {
return iter->cur;
}
int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) {
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList->state.queryCount++;
tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
if (pStart == 0) {
*pRes = NULL;
return 0;
}
return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
}
#endif
static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) {
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
__compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, keyType);
if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) {
tSkipListNode *p = forward[0]->pForward[0];
doRemove(pSkipList, p, forward);
} else { // failed to find the node of specified value,abort
return false;
for (int32_t i = sLevel; i >= 0; --i) {
SSkipListNode *pNext = SL_GET_FORWARD_POINTER(pNode, i);
while (pNext != NULL) {
char *key = SL_GET_NODE_KEY(pSkipList, pNext);
if ((ret = filterComparFn(key, pKey)) < 0) {
pNode = pNext;
pNext = SL_GET_FORWARD_POINTER(pNext, i);
} else {
break;
}
// compress the minimum level of skip list
while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) {
pSkipList->nLevel -= 1;
}
return true;
}
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
pthread_rwlock_rdlock(&pSkipList->lock);
for (int32_t i = 0; i < pNode->nLevel; ++i) {
forward[i] = pNode->pBackward[i];
// find the qualified key
if (ret == 0) {
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
removeSupport(pSkipList, forward, &pNode->key);
pthread_rwlock_unlock(&pSkipList->lock);
}
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
pthread_rwlock_rdlock(&pSkipList->lock);
SSkipListNode* pResult = SL_GET_FORWARD_POINTER(pNode, i);
taosArrayPush(sa, &pResult);
tSkipListNode *x = &pSkipList->pHead;
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) {
x = x->pForward[i];
// skip list does not allowed duplicated key, abort further retrieve data
if (!pSkipList->keyInfo.dupKey) {
break;
}
}
forward[i] = x;
}
bool ret = removeSupport(pSkipList, forward, pKey);
pthread_rwlock_unlock(&pSkipList->lock);
return ret;
}
void tSkipListPrint(tSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) {
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return sa;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *p = pStartNode;
// int32_t numOfRes = 0;
//
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pEndKey->nType);
// while (p != NULL) {
// int32_t ret = filterComparFn(&p->key, pEndKey);
// if (ret > 0) {
// break;
// }
//
// if (ret < 0) {
// numOfRes++;
// p = p->pForward[0];
// } else if (ret == 0) {
// if (cond == TSDB_RELATION_LESS_EQUAL) {
// numOfRes++;
// p = p->pForward[0];
// } else {
// break;
// }
// }
// }
//
// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes);
// for (int32_t i = 0; i < numOfRes; ++i) {
// (*pRes)[i] = pStartNode;
// pStartNode = pStartNode->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return numOfRes;
//}
//
///*
// * maybe return the copy of SSkipListNode would be better
// */
// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) {
// (*pRes) = NULL;
//
// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey);
// if (pNode == NULL) {
// return 0;
// }
//
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// // backward check if previous nodes are with the same value.
// SSkipListNode *pPrev = pNode->pBackward[0];
// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) {
// pPrev = pPrev->pBackward[0];
// }
//
// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
//}
//
// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) {
// int32_t sLevel = pSkipList->level - 1;
// int32_t ret = -1;
//
// SSkipListNode *x = &pSkipList->pHead;
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
// for (int32_t i = sLevel; i >= 0; --i) {
// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) {
// x = x->pForward[i];
// }
// }
//
// // backward check if previous nodes are with the same value.
// if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
// SSkipListNode *pNode = x->pForward[0];
// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) {
// pNode = pNode->pBackward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pNode;
// }
//
// if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
// pthread_rwlock_unlock(&pSkipList->lock);
// return x->pForward[0];
// } else { // cond == TSDB_RELATION_LARGE && ret == 0
// SSkipListNode *pn = x->pForward[0];
// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) {
// pn = pn->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pn;
// }
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
// return NULL;
//}
//
// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
// void *param) {
// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
// if (NULL == *pRes) {
// pError("error skiplist %p, malloc failed", pSkipList);
// return -1;
// }
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
// int32_t num = 0;
//
// for (int32_t i = 0; i < pSkipList->nSize; ++i) {
// if (pStartNode == NULL) {
// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
//#ifdef _DEBUG_VIEW
// tSkipListPrint(pSkipList, 1);
//#endif
// break;
// }
//
// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
// (*pRes)[num++] = pStartNode;
// }
//
// pStartNode = pStartNode->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// if (num == 0) {
// free(*pRes);
// *pRes = NULL;
// } else if (num < pSkipList->nSize) { // free unused memory
// char *tmp = realloc((*pRes), num * POINTER_BYTES);
// assert(tmp != NULL);
//
// *pRes = (SSkipListNode **)tmp;
// }
//
// return num;
//}
//
// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) {
// if (pSkipList == NULL) {
// return -1;
// }
//
// iter->pSkipList = pSkipList;
// if (pSkipList->lock) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// }
// iter->cur = NULL; // pSkipList->pHead.pForward[0];
// iter->num = pSkipList->size;
//
// if (pSkipList->lock) {
// pthread_rwlock_unlock(&pSkipList->lock);
// }
//
// return 0;
//}
//
// bool tSkipListIteratorNext(SSkipListIterator *iter) {
// if (iter->num == 0 || iter->pSkipList == NULL) {
// return false;
// }
//
// SSkipList *pSkipList = iter->pSkipList;
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// if (iter->cur == NULL) {
// iter->cur = pSkipList->pHead.pForward[0];
// } else {
// iter->cur = iter->cur->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return iter->cur != NULL;
//}
//
// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; }
//
// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) {
// pSkipList->state.queryCount++;
// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
// if (pStart == 0) {
// *pRes = NULL;
// return 0;
// }
//
// return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
//}
//
// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) {
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) {
// SSkipListNode *p = forward[0]->pForward[0];
// doRemove(pSkipList, p, forward);
// } else { // failed to find the node of specified value,abort
// return false;
// }
//
// // compress the minimum level of skip list
// while (pSkipList->level > 0 && pSkipList->pHead.pForward[pSkipList->level - 1] == NULL) {
// pSkipList->level -= 1;
// }
//
// return true;
//}
//
// void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// for (int32_t i = 0; i < pNode->level; ++i) {
// forward[i] = pNode->pBackward[i];
// }
//
// removeSupport(pSkipList, forward, &pNode->key);
// pthread_rwlock_unlock(&pSkipList->lock);
//}
//
// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// SSkipListNode *x = &pSkipList->pHead;
// for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
// while (x->pForward[i] != NULL && (filterComparFn(&x->pForward[i]->key, pKey) < 0)) {
// x = x->pForward[i];
// }
// forward[i] = x;
// }
//
// bool ret = removeSupport(pSkipList, forward, pKey);
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return ret;
//}
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
return;
}
tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1];
SSkipListNode *p = SL_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1;
while (p) {
switch (pSkipList->keyType) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
switch (pSkipList->keyInfo.type) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key);
fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key);
break;
case TSDB_DATA_TYPE_BINARY:
fprintf(stdout, "%d: %s \n", id++, p->key.pz);
fprintf(stdout, "%d: %s \n", id++, key);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, p->key.dKey);
fprintf(stdout, "%d: %lf \n", id++, *(double *)key);
break;
default:
fprintf(stdout, "\n");
}
p = p->pForward[nlevel - 1];
}
}
/*
* query processor based on query condition
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) {
// query condition check
int32_t rel = 0;
__compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType);
if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 ||
(((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 &&
pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) {
(*pResult) = NULL;
return 0;
}
if (rel == 0) {
/*
* 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd
* point query
*/
if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL &&
pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query
return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult);
} else {
(*pResult) = NULL;
return 0;
}
} else {
/* range query, query operation code check */
return tSkipListRangeQuery(pSkipList, pQueryCond, pResult);
}
}
typedef struct MultipleQueryResult {
int32_t len;
tSkipListNode **pData;
} MultipleQueryResult;
static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) {
int32_t total = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
total += pResults[i].len;
}
(*pRes) = malloc(POINTER_BYTES * total);
int32_t idx = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
MultipleQueryResult *pOneResult = &pResults[i];
for (int32_t j = 0; j < pOneResult->len; ++j) {
(*pRes)[idx++] = pOneResult->pData[j];
}
}
return total;
}
static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) {
if (*numOfKey == 1) {
return;
}
qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator);
int32_t i = 0, j = 1;
while (i < (*numOfKey) && j < (*numOfKey)) {
int32_t ret = comparator(&pKey[i], &pKey[j]);
if (ret == 0) {
j++;
} else {
pKey[i + 1] = pKey[j];
i++;
j++;
}
}
(*numOfKey) = i + 1;
}
int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator,
tSkipListNode *pNode) {
int32_t i = 0, j = 0;
// merge two sorted arrays in O(n) time
while (i < numOfKey && pNode != NULL) {
int32_t ret = comparator(&pNode->key, &pKey[i]);
if (ret < 0) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
} else if (ret == 0) {
pNode = pNode->pForward[0];
} else { // pNode->key > pkey[i]
i++;
}
}
while (pNode != NULL) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
}
return j;
}
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pRes) {
if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 ||
(type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) {
(*pRes) = NULL;
return 0;
}
__compar_fn_t comparator = getKeyComparator(pKey->nType);
removeDuplicateKey(pKey, &numOfKey, comparator);
if (type == INCLUDE_POINT_QUERY) {
if (numOfKey == 1) {
return tSkipListGets(pSkipList, &pKey[0], pRes);
} else {
MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey);
for (int32_t i = 0; i < numOfKey; ++i) {
pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData);
}
int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes);
for (int32_t i = 0; i < numOfKey; ++i) {
free(pTempResult[i].pData);
}
free(pTempResult);
return num;
}
} else { // exclude query
*pRes = malloc(POINTER_BYTES * pSkipList->nSize);
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode);
if (retLen < pSkipList->nSize) {
(*pRes) = realloc(*pRes, POINTER_BYTES * retLen);
}
return retLen;
p = SL_GET_FORWARD_POINTER(p, nlevel - 1);
// p = p->pForward[nlevel - 1];
}
}
cmake_minimum_required(VERSION 2.8)
project(tsdb)
add_subdirectory(common)
add_subdirectory(tsdb)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_DATA_FORMAT_H_)
#define _TD_DATA_FORMAT_H_
......@@ -5,6 +19,9 @@
#include "schema.h"
#ifdef __cplusplus
extern "C" {
#endif
// ----------------- Data row structure
/* A data row, the format of it is like below:
......@@ -50,6 +67,8 @@ typedef char * SDataCols;
#define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow))
#define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t))
SDataRow tdSDataRowDup(SDataRow rdata);
// ---- operation on SDataRows
#define TD_DATAROWS_LEN(pDataRows) (*(int32_t *)(pDataRows))
#define TD_DATAROWS_ROWS(pDataRows) (*(int32_t *)(pDataRows + sizeof(int32_t)))
......@@ -69,4 +88,8 @@ typedef char * SDataCols;
*/
int32_t tdGetMaxDataRowSize(SSchema *pSchema);
#ifdef __cplusplus
}
#endif
#endif // _TD_DATA_FORMAT_H_
#if !defined(_TD_KEY_H_)
#define _TD_KEY_H_
typedef struct {
} key;
#endif // _TD_KEY_H_
#if !defined(_TD_LIST_H_)
#define _TD_LIST_H_
#include <stdint.h>
typedef enum { TD_LIST_ORDERED, TD_LIST_UNORDERED } TLIST_TYPE;
typedef int32_t (* comparefn(void *key1, void *key2));
struct _list_type {
TLIST_TYPE type;
};
typedef struct _list_node {
} SListNode;
typedef struct _list {
} SList;
#endif // _TD_LIST_H_
......@@ -67,6 +67,7 @@ typedef char *SISchema;
SISchema tdConvertSchemaToInline(SSchema *pSchema);
int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName);
int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId);
SSchema *tdDupSchema(SSchema *pSchema);
// ---- TODO: operations to modify schema
......
......@@ -16,18 +16,7 @@ typedef enum {
TD_DATATYPE_BINARY
} td_datatype_t;
const int32_t rowDataLen[] = {
sizeof(int8_t), // TD_DATATYPE_BOOL,
sizeof(int8_t), // TD_DATATYPE_TINYINT,
sizeof(int16_t), // TD_DATATYPE_SMALLINT,
sizeof(int32_t), // TD_DATATYPE_INT,
sizeof(int64_t), // TD_DATATYPE_BIGINT,
sizeof(float), // TD_DATATYPE_FLOAT,
sizeof(double), // TD_DATATYPE_DOUBLE,
sizeof(int32_t), // TD_DATATYPE_VARCHAR,
sizeof(int32_t), // TD_DATATYPE_NCHAR,
sizeof(int32_t) // TD_DATATYPE_BINARY
};
extern const int32_t rowDataLen[];
// TODO: finish below
#define TD_DATATYPE_BOOL_NULL
......
......@@ -5,15 +5,13 @@
int32_t tdGetMaxDataRowSize(SSchema *pSchema) {
int32_t nbytes = 0;
for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++)
{
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) {
SColumn * pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
td_datatype_t type = TD_COLUMN_TYPE(pCol);
nbytes += rowDataLen[type];
switch (type)
{
switch (type) {
case TD_DATATYPE_VARCHAR:
nbytes += TD_COLUMN_BYTES(pCol);
break;
......@@ -23,6 +21,8 @@ int32_t tdGetMaxDataRowSize(SSchema *pSchema) {
case TD_DATATYPE_BINARY:
nbytes += TD_COLUMN_BYTES(pCol);
break;
default:
break;
}
}
......@@ -30,3 +30,5 @@ int32_t tdGetMaxDataRowSize(SSchema *pSchema) {
return nbytes;
}
SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; }
\ No newline at end of file
#include <stdlib.h>
#include "schema.h"
const int32_t rowDataLen[] = {
sizeof(int8_t), // TD_DATATYPE_BOOL,
sizeof(int8_t), // TD_DATATYPE_TINYINT,
sizeof(int16_t), // TD_DATATYPE_SMALLINT,
sizeof(int32_t), // TD_DATATYPE_INT,
sizeof(int64_t), // TD_DATATYPE_BIGINT,
sizeof(float), // TD_DATATYPE_FLOAT,
sizeof(double), // TD_DATATYPE_DOUBLE,
sizeof(int32_t), // TD_DATATYPE_VARCHAR,
sizeof(int32_t), // TD_DATATYPE_NCHAR,
sizeof(int32_t) // TD_DATATYPE_BINARY
};
static size_t tdGetEstimatedISchemaLen(SSchema *pSchema) {
size_t colNameLen = 0;
......@@ -83,3 +95,7 @@ int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId) {
}
return -1;
}
SSchema *tdDupSchema(SSchema *pSchema) {
return NULL;
}
\ No newline at end of file
......@@ -4,9 +4,20 @@
#include "tsdb.h"
TEST(TsdbTest, createTsdbRepo) {
STSDBCfg *pCfg = (STSDBCfg *)malloc(sizeof(STSDBCfg));
STsdbCfg config;
free(pCfg);
config.precision = TSDB_PRECISION_MILLI;
config.tsdbId = 0;
config.maxTables = 100;
config.daysPerFile = 10;
config.keep = 3650;
config.minRowsPerFileBlock = 100;
config.maxRowsPerFileBlock = 4096;
config.maxCacheSize = 4 * 1024 * 1024;
ASSERT_EQ(1, 2/2);
tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", &config, NULL);
ASSERT_NE(pRepo, nullptr);
tsdbCloseRepo(pRepo);
}
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
message(STATUS "tsdb source files: ${SOURCE_LIST}")
add_library(tsdb STATIC ${SOURCE_LIST})
target_link_libraries(tsdb common tutil)
target_link_libraries(tsdb common)
target_include_directories(tsdb PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc")
\ No newline at end of file
target_include_directories(tsdb
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/src/util/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/src/os/linux/inc"
)
\ No newline at end of file
/**************************************
* FOR OUTSIDE USAGE
**************************************/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDB_H_)
#define _TD_TSDB_H_
......@@ -8,17 +19,24 @@
#include <stdbool.h>
#include <stdint.h>
// #include "cache.h"
#include "dataformat.h"
#include "schema.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
typedef void tsdb_repo_t; // use void to hide implementation details from outside
typedef int32_t table_id_t; // table ID type in this repository
typedef int16_t tsdb_id_t; // TSDB repository ID
// Submit message
typedef struct {
int64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository.
} STableId;
// Submit message for this TSDB
typedef struct {
int32_t numOfTables;
int32_t compressed;
......@@ -27,77 +45,48 @@ typedef struct {
// Submit message for one table
typedef struct {
table_id_t tableId; // table ID to insert
STableId tableId;
int32_t sversion; // data schema version
int32_t numOfRows; // number of rows data
int64_t uid; // table UID to insert
char data[];
} SSubmitBlock;
// Retention policy.
typedef struct {
// TODO: Need a more fancy description
int32_t keep1;
int32_t keep2;
int32_t keep3;
} SRetentionPolicy;
// Data sharding policy.
typedef struct {
// TODO: Need a more fancy description
int32_t daysPerFile;
} SDataShardPolicy;
// Rows in file block policy
typedef struct {
// TODO: Need a more fancy description
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
} SBlockRowsPolicy;
// Applications trying to manipulate a table should provide both uid and tableId.
// tableId is used for table quick access and uid for verification.
typedef struct {
int64_t uid; // the unique table ID
table_id_t tableId; // the table ID in the repository.
} STableId;
enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
// the TSDB repository configuration
typedef struct {
char * rootDir; // TSDB repository root directory, TODO: need to adjust here
tsdb_id_t tsdbId;
int8_t precision;
int32_t tsdbId;
int32_t maxTables; // maximum number of tables this repository can have
SDataShardPolicy dataShardPolicy;
SBlockRowsPolicy blockRowsPolicy;
SRetentionPolicy retentionPlicy; // retention configuration
void * cachePool; // the cache pool the repository to use
} STSDBCfg;
int32_t daysPerFile; // day per file sharding policy
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int32_t keep; // day of data to keep
int64_t maxCacheSize; // maximum cache size this TSDB can use
} STsdbCfg;
// the TSDB repository info
typedef struct STSDBRepoInfo {
STSDBCfg tsdbCfg;
typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg;
int64_t version; // version of the repository
int64_t tsdbTotalDataSize; // the original inserted data size
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STSDBRepoInfo;
} STsdbRepoInfo;
// the meter configuration
typedef struct {
char * tableName;
int64_t uid; // uid given by upper layer
table_id_t tableId; // table ID allocated from upper layer
STableId tableId;
char *stableName; // if not NULL, the table is created from a super table, need to make sure the super
// table exists in this TSDB.
int64_t stableUid;
int64_t createdTime;
int32_t numOfCols; // number of columns. For table form super table, not includes the tag schema
SSchema *schema; // If numOfCols == schema_->numOfCols, it is a normal table, stableName = NULL
// If numOfCols < schema->numOfCols, it is a table created from super table
// assert(numOfCols <= schema->numOfCols);
char *tagValues; // NULL if it is normal table
SDataRow tagValues; // NULL if it is normal table
// otherwise, it contains the tag values.
} STableCfg;
......@@ -111,11 +100,12 @@ typedef struct {
/**
* Create a new TSDB repository
* @param rootDir the TSDB repository root directory
* @param pCfg the TSDB repository configuration, upper layer to free the pointer
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg);
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
/**
* Close and free all resources taken by the repository
......@@ -149,7 +139,7 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo);
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t tsdbConfigRepo(tsdb_repo_t repo, STSDBCfg *pCfg);
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
/**
* Get the TSDB repository information, including some statistics
......@@ -159,7 +149,7 @@ int32_t tsdbConfigRepo(tsdb_repo_t repo, STSDBCfg *pCfg);
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STSDBRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
// -- For table manipulation
......@@ -181,7 +171,7 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid, int32_t *error);
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid);
/**
* Get the information of a table in the repository
......@@ -191,7 +181,7 @@ int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid, int32_t *error);
*
* @return a table information handle for success, NULL for failure and the error number is set
*/
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid, int32_t *error);
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
// -- FOR INSERT DATA
/**
......@@ -203,7 +193,7 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid, int32_t *error);
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData, int32_t *error);
int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData);
// -- FOR QUERY TIME SERIES DATA
......@@ -251,7 +241,6 @@ typedef struct STableIDList {
} STableIDList;
typedef struct {
} SFields;
/**
......@@ -275,7 +264,8 @@ tsdb_query_handle_t *tsdbQueryFromTableID(tsdb_repo_t *pRepo, STSDBQueryCond *pC
* @param pTagFilterStr tag filter info
* @return
*/
tsdb_query_handle_t *tsdbQueryFromTagConds(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, int16_t stableId, const char *pTagFilterStr);
tsdb_query_handle_t *tsdbQueryFromTagConds(tsdb_repo_t *pRepo, STSDBQueryCond *pCond, int16_t stableId,
const char *pTagFilterStr);
/**
* Reset to the start(end) position of current query, from which the iterator starts.
......@@ -349,4 +339,8 @@ STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
*/
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_H_
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDBCACHE_H_)
#define _TD_TSDBCACHE_H_
......@@ -5,6 +19,10 @@
// #include "cache.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */
typedef struct {
......@@ -38,4 +56,8 @@ typedef struct STSDBCache {
SCacheHandle *tsdbCreateCache(int32_t numOfBlocks);
int32_t tsdbFreeCache(SCacheHandle *pHandle);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDBCACHE_H_
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDB_FILE_H_)
#define _TD_TSDB_FILE_H_
#include <stdint.h>
// #include "tstring.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef int32_t file_id_t;
typedef enum {
......@@ -13,12 +31,7 @@ typedef enum {
TSDB_FILE_TYPE_META // .meta file type
} TSDB_FILE_TYPE;
const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA
".last", // TSDB_FILE_TYPE_LAST
".meta" // TSDB_FILE_TYPE_META
};
extern const char *tsdbFileSuffix[];
typedef struct {
int64_t fileSize;
......@@ -36,6 +49,12 @@ typedef struct {
// int16_t numOfBlocks;
// } SDataBlock;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META)
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_FILE_H_
/************************************
* For internal usage
************************************/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TSDB_META_H_)
#define _TSDB_META_H_
#include <pthread.h>
#include "dataformat.h"
#ifdef __cplusplus
extern "C" {
#endif
// #include "taosdef.h"
// Initially, there are 4 tables
......@@ -16,15 +35,15 @@ typedef enum {
} TSDB_TABLE_TYPE;
typedef struct STable {
tsdb_id_t tableId;
int64_t uid;
char * tableName;
STableId tableId;
TSDB_TABLE_TYPE type;
int64_t createdTime;
// super table UID
tsdb_id_t superTableId;
// super table UID -1 for normal table
int32_t stableUid;
int32_t numOfCols;
// Schema for this table
// For TSDB_SUPER_TABLE, it is the schema including tags
......@@ -35,7 +54,7 @@ typedef struct STable {
// Tag value for this table
// For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL
// For TSDB_STABLE, it is the tag value string
char *pTagVal;
SDataRow pTagVal;
// Object content;
// For TSDB_SUPER_TABLE, it is the index of tables created from it
......@@ -46,23 +65,21 @@ typedef struct STable {
} content;
// A handle to deal with event
void *eventHandle;
void *eventHandler;
// A handle to deal with stream
void *streamHandle;
void *streamHandler;
struct STable *next;
} STable;
typedef struct {
int32_t numOfTables; // Number of tables not including TSDB_SUPER_TABLE (#TSDB_NTABLE + #TSDB_STABLE)
int32_t numOfSuperTables; // Number of super tables (#TSDB_SUPER_TABLE)
// An array of tables (TSDB_NTABLE and TSDB_STABLE) in this TSDB repository
STable **pTables;
// A map of tableName->tableId
// TODO: May use hash table
void *pNameTableMap;
} SMetaHandle;
int32_t maxTables;
STable **tables; // array of normal tables
STable * stables; // linked list of super tables
void * tableMap; // hash map of uid ==> STable *
} STsdbMeta;
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
......@@ -84,10 +101,18 @@ SSchema *tsdbGetTableSchema(STable *pTable);
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
// Create a new meta handle with configuration
SMetaHandle * tsdbCreateMetaHandle (int32_t numOfTables);
int32_t tsdbFreeMetaHandle(SMetaHandle *pMetaHandle);
STsdbMeta *tsdbCreateMeta(int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
// Recover the meta handle from the file
SMetaHandle * tsdbOpenMetaHandle(char *tsdbDir);
STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir);
int32_t tsdbCreateTableImpl(STsdbMeta *pHandle, STableCfg *pCfg);
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData);
#ifdef __cplusplus
}
#endif
int32_t tsdbCreateTableImpl(SMetaHandle *pHandle, STableCfg *pCfg);
#endif // _TSDB_META_H_
\ No newline at end of file
......@@ -14,5 +14,4 @@ SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) {
}
int32_t tsdbFreeCache(SCacheHandle *pHandle) {
}
\ No newline at end of file
int32_t tsdbFreeCache(SCacheHandle *pHandle) { return 0; }
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tsdbFile.h"
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type){
// char *suffix = tsdbFileSuffix[type];
// TODO
const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA
".last", // TSDB_FILE_TYPE_LAST
".meta" // TSDB_FILE_TYPE_META
};
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL;
char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5);
if (fileName == NULL) return NULL;
sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]);
return fileName;
}
\ No newline at end of file
#include <stdio.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <dirent.h>
// #include "taosdef.h"
// #include "disk.h"
#include "tsdbFile.h"
#include "tsdb.h"
#include "tsdbCache.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
enum {
TSDB_REPO_STATE_ACTIVE,
TSDB_REPO_STATE_CLOSED,
TSDB_REPO_STATE_CONFIGURING
};
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
typedef struct _tsdb_repo {
char *rootDir;
// TSDB configuration
STSDBCfg *pCfg;
STsdbCfg config;
// The meter meta handle of this TSDB repository
SMetaHandle *pMetaHandle;
STsdbMeta *tsdbMeta;
// The cache Handle
SCacheHandle *pCacheHandle;
SCacheHandle *tsdbCache;
// Disk tier handle for multi-tier storage
void *pDiskTier;
void *diskTier;
// File Store
void *pFileStore;
void *tsdbFiles;
pthread_mutex_t tsdbMutex;
// A limiter to monitor the resources used by tsdb
void *limiter;
int8_t state;
} STSDBRepo;
} STsdbRepo;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
#define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE)
#define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED)
// Check the correctness of the TSDB configuration
static int32_t tsdbCheckCfg(STSDBCfg *pCfg) {
if (pCfg->rootDir == NULL) return -1;
if (access(pCfg->rootDir, F_OK|R_OK|W_OK) == -1) {
return -1;
}
// TODO
return 0;
}
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
static int32_t tsdbCreateFiles(STSDBRepo *pRepo) {
// TODO
}
if (rootDir == NULL) return NULL;
static int32_t tsdbClearFiles(STSDBRepo *pRepo) {
// TODO
}
if (access(rootDir, F_OK|R_OK|W_OK) == -1) return NULL;
tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg) {
// Check the configuration
if (tsdbCheckCfg(pCfg) < 0) {
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) {
return NULL;
}
STSDBRepo *pRepo = (STSDBRepo *)malloc(sizeof(STSDBRepo));
STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo));
if (pRepo == NULL) {
// TODO: deal with error
return NULL;
}
// TODO: Initailize pMetahandle
pRepo->pMetaHandle = tsdbCreateMetaHandle(pCfg->maxTables);
if (pRepo->pMetaHandle == NULL) {
// TODO: deal with error
pRepo->rootDir = strdup(rootDir);
pRepo->config = *pCfg;
pRepo->limiter = limiter;
pRepo->tsdbMeta = tsdbCreateMeta(pCfg->maxTables);
if (pRepo->tsdbMeta == NULL) {
free(pRepo->rootDir);
free(pRepo);
return NULL;
}
// TODO: Initialize cache handle
pRepo->pCacheHandle = tsdbCreateCache(5);
if (pRepo->pCacheHandle == NULL) {
// TODO: free the object and return error
tsdbFreeMetaHandle(pRepo->pCacheHandle);
pRepo->tsdbCache = tsdbCreateCache(5);
if (pRepo->tsdbCache == NULL) {
free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo);
return NULL;
}
// Set configuration
pRepo->pCfg = pCfg;
// Create the Meta data file and data directory
if (tsdbCreateFiles(pRepo) < 0) {
// Failed to create and save files
tsdbFreeMetaHandle(pRepo->pCacheHandle);
if (tsdbSetRepoEnv(pRepo) < 0) {
free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbFreeCache(pRepo->tsdbCache);
free(pRepo);
return NULL;
}
......@@ -112,93 +104,191 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg) {
}
int32_t tsdbDropRepo(tsdb_repo_t *repo) {
STSDBRepo *pRepo = (STSDBRepo *)repo;
STsdbRepo *pRepo = (STsdbRepo *)repo;
pRepo->state = TSDB_REPO_STATE_CLOSED;
// Free the metaHandle
tsdbFreeMetaHandle(pRepo->pMetaHandle);
tsdbFreeMeta(pRepo->tsdbMeta);
// Free the cache
tsdbFreeCache(pRepo->pCacheHandle);
tsdbFreeCache(pRepo->tsdbCache);
tsdbClearFiles(pRepo);
// Destroy the repository info
tsdbDestroyRepoEnv(pRepo);
free(pRepo->rootDir);
free(pRepo);
return 0;
}
tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
if (access(tsdbDir, F_OK|W_OK|R_OK) < 0) {
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) {
return NULL;
}
STSDBRepo *pRepo = (STSDBRepo *)malloc(sizeof(STSDBRepo));
STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo));
if (pRepo == NULL) {
return NULL;
}
// TODO: Initialize configuration from the file
pRepo->pMetaHandle = tsdbOpenMetaHandle();
if (pRepo->pMetaHandle == NULL) {
int fd = tsdbOpenMetaFile(tsdbDir);
if (fd < 0) {
free(pRepo);
return NULL;
}
pRepo->pCacheHandle = tsdbCreateCache(5);
if (pRepo->pCacheHandle == NULL) {
if (tsdbRecoverRepo(fd, &(pRepo->config)) < 0) {
close(fd);
free(pRepo);
return NULL;
}
pRepo->tsdbCache = tsdbCreateCache(5);
if (pRepo->tsdbCache == NULL) {
// TODO: deal with error
return NULL;
}
pRepo->rootDir = strdup(tsdbDir);
pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo;
}
static int32_t tsdbFlushCache(STSDBRepo *pRepo) {
static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
// TODO
return 0;
}
int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
STSDBRepo *pRepo = (STSDBRepo *)repo;
tsdbFlushCache(pRepo);
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0;
pRepo->state = TSDB_REPO_STATE_CLOSED;
tsdbFreeMetaHandle(pRepo->pMetaHandle);
tsdbFlushCache(pRepo);
tsdbFreeCache(pRepo->pMetaHandle);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbFreeCache(pRepo->tsdbCache);
return 0;
}
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STSDBCfg *pCfg) {
STSDBRepo *pRepo = (STSDBRepo *)repo;
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
pRepo->pCfg = pCfg;
pRepo->config = *pCfg;
// TODO
return 0;
}
STSDBRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo) {
STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo) {
// TODO
return NULL;
}
int32_t tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg) {
STSDBRepo *pRepo = (STSDBRepo *)repo;
return tsdbCreateTableImpl(pRepo->pMetaHandle, pCfg);
STsdbRepo *pRepo = (STsdbRepo *)repo;
return tsdbCreateTableImpl(pRepo->tsdbMeta, pCfg);
}
int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
// TODO
return 0;
}
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid) {
return 0;
}
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid) {
// TODO
return NULL;
}
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid, int32_t *error) {
int32_t tsdbInsertData(tsdb_repo_t *repo, STableId tableId, char *pData) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbInsertDataImpl(pRepo->tsdbMeta, tableId, pData);
return 0;
}
// Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// TODO
return 0;
}
int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData, int32_t *error) {
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META);
int fd = open(metaFname, O_WRONLY|O_CREAT);
if (fd < 0) {
return -1;
}
if (write(fd, (void *)(&(pRepo->config)), sizeof(STsdbCfg)) < 0) {
return -1;
}
// Create the data file
char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
if (dirName == NULL) {
return -1;
}
sprintf(dirName, "%s/%s", pRepo->rootDir, "tsdb");
if (mkdir(dirName, 0755) < 0) {
free(dirName);
return -1;
}
free(dirName);
return 0;
}
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
char fname[128];
if (pRepo == NULL) return 0;
char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
if (dirName == NULL) {
return -1;
}
sprintf(dirName, "%s/%s", pRepo->rootDir, "tsdb");
DIR *dir = opendir(dirName);
if (dir == NULL) return -1;
struct dirent *dp;
while ((dp = readdir(dir)) != NULL) {
if ((strcmp(dp->d_name, ".") == 0) || (strcmp(dp->d_name, "..") == 0)) continue;
sprintf(fname, "%s/%s", pRepo->rootDir, dp->d_name);
remove(fname);
}
closedir(dir);
rmdir(dirName);
char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META);
remove(metaFname);
return 0;
}
static int tsdbOpenMetaFile(char *tsdbDir) {
// TODO
return 0;
}
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) {
// TODO: read tsdb configuration from file
// recover tsdb meta
return 0;
}
\ No newline at end of file
#include <stdlib.h>
// #include "taosdef.h"
#include "hash.h"
#include "tskiplist.h"
#include "tsdb.h"
#include "tsdbMeta.h"
SMetaHandle *tsdbCreateMetaHandle(int32_t numOfTables) {
SMetaHandle *pMetahandle = (SMetaHandle *)malloc(sizeof(SMetaHandle));
if (pMetahandle == NULL) {
#define TSDB_MIN_TABLES 10
#define TSDB_MAX_TABLES 100000
#define TSDB_DEFAULT_NSTABLES 10
#define IS_VALID_MAX_TABLES(maxTables) (((maxTables) >= TSDB_MIN_TABLES) && ((maxTables) <= TSDB_MAX_TABLES))
static int tsdbFreeTable(STable *pTable);
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
static STable *tsdbGetTableByUid(int64_t uid);
static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
if (!IS_VALID_MAX_TABLES(maxTables)) return NULL;
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) {
return NULL;
}
pMetahandle->numOfTables = 0;
pMetahandle->numOfSuperTables = 0;
pMetahandle->pTables = calloc(sizeof(STable *), numOfTables);
if (pMetahandle->pTables == NULL) {
free(pMetahandle);
pMeta->maxTables = maxTables;
pMeta->stables = NULL;
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
if (pMeta->tables == NULL) {
free(pMeta);
return NULL;
}
// TODO : initialize the map
// pMetahandle->pNameTableMap = ;
if (pMetahandle->pNameTableMap == NULL) {
free(pMetahandle->pTables);
free(pMetahandle);
pMeta->tableMap = taosInitHashTable(maxTables + maxTables / 10, taosGetDefaultHashFunction, false);
if (pMeta->tableMap == NULL) {
free(pMeta->tables);
free(pMeta);
return NULL;
}
return pMetahandle;
return pMeta;
}
int32_t tsdbFreeMetaHandle(SMetaHandle *pMetaHandle) {
// TODO
int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
if (pMeta == NULL) return 0;
}
for (int i = 0; i < pMeta->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbFreeTable(pMeta->tables[i]);
}
}
static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { return 0; }
free(pMeta->tables);
STable *pTable = pMeta->stables;
while (pTable != NULL) {
STable *pTemp = pTable;
pTable = pTemp->next;
tsdbFreeTable(pTemp);
}
taosCleanUpHashTable(pMeta->tableMap);
free(pMeta);
int32_t tsdbCreateTableImpl(SMetaHandle *pHandle, STableCfg *pCfg) {
return 0;
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) {
return -1;
}
// TODO:
STable *pSTable = NULL;
if (pCfg->stableUid > 0) { // to create a TSDB_STABLE
pSTable = tsdbGetTableByUid(pCfg->stableUid);
if (pSTable == NULL) { // super table not exists, try to create it
pSTable = (STable *)calloc(1, sizeof(STable));
if (pSTable == NULL) return -1;
pSTable->tableId.uid = pCfg->stableUid;
pSTable->tableId.tid = -1;
pSTable->type = TSDB_SUPER_TABLE;
pSTable->createdTime = pCfg->createdTime; // The created time is not required
pSTable->stableUid = -1;
pSTable->numOfCols = pCfg->numOfCols;
pSTable->pSchema = tdDupSchema(pCfg->schema);
// pSTable->content.pIndex = tSkipListCreate(5, 0, 10); // TODO: change here
tsdbAddTable(pMeta, pSTable);
} else {
if (pSTable->type != TSDB_SUPER_TABLE) return NULL;
}
}
STable *pTable = (STable *)malloc(sizeof(STable));
if (pTable == NULL) {
return -1;
}
pHandle->pTables[pCfg->tableId] = pTable;
pTable->tableId = pCfg->tableId;
pTable->createdTime = pCfg->createdTime;
if (1 /* */) { // TSDB_STABLE
pTable->type = TSDB_STABLE;
pTable->stableUid = pCfg->stableUid;
pTable->pTagVal = tdSDataRowDup(pCfg->tagValues);
} else { // TSDB_NTABLE
pTable->type = TSDB_NTABLE;
pTable->stableUid = -1;
pTable->pSchema = tdDupSchema(pCfg->schema);
}
// pTable->content.pData = tSkipListCreate(5, 0, 10); // TODO: change here
tsdbAddTable(pMeta, pTable);
// TODO: add name to it
return 0;
}
SMetaHandle * tsdbOpenMetaHandle(char *tsdbDir) {
STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) {
// Open meta file for reading
SMetaHandle *pHandle = (SMetaHandle *)malloc(sizeof(SMetaHandle));
if (pHandle == NULL) {
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) {
return NULL;
}
return pHandle;
return pMeta;
}
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData) {
STable *pTable = pMeta->tables[tableId.tid];
if (pTable == NULL) {
// TODO: deal with the error here
return 0;
}
if (pTable->tableId.uid != tableId.uid) {
// TODO: deal with the error here
return 0;
}
return 0;
}
static int tsdbFreeTable(STable *pTable) { return 0; }
static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { return 0; }
static STable *tsdbGetTableByUid(int64_t uid) { return NULL; }
static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_SUPER_TABLE) {
if (pMeta->stables == NULL) {
pMeta->stables = pTable;
pTable->next = NULL;
} else {
STable *pTemp = pMeta->stables;
pMeta->stables = pTable;
pTable->next = pTemp;
}
} else {
pMeta->tables[pTable->tableId.tid] = pTable;
if (pTable->type == TSDB_STABLE) {
tsdbAddTableIntoIndex(pMeta, pTable);
}
}
return tsdbAddTableIntoMap(pMeta, pTable);
}
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
// TODO: add the table to the map
return 0;
}
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
// TODO
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册