未验证 提交 ba451bee 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1321 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "tscSyntaxtreefunction.h" #include "tscSyntaxtreefunction.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "taosdef.h" #include "taosdef.h"
#include "tskiplist.h" #include "sskiplist.h"
#include "tsqldef.h" #include "tsqldef.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tstoken.h" #include "tstoken.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_TSKIPLIST_H
#define TBASE_TSKIPLIST_H
#ifdef __cplusplus
extern "C" {
#endif
#define MAX_SKIP_LIST_LEVEL 20
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include "os.h"
#include "ttypes.h"
/*
* key of each node
* todo move to as the global structure in all search codes...
*/
const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15;
typedef tVariant tSkipListKey;
typedef enum tSkipListPointQueryType {
INCLUDE_POINT_QUERY,
EXCLUDE_POINT_QUERY,
} tSkipListPointQueryType;
typedef struct tSkipListNode {
uint16_t nLevel;
char * pData;
struct tSkipListNode **pForward;
struct tSkipListNode **pBackward;
tSkipListKey key;
} tSkipListNode;
/*
* @version 0.2
* @date 2017/11/12
* the simple version of SkipList.
* for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate
* deterministic result. Later, we will remove the lock in SkipList to further
* enhance the performance. In this case, one should use the concurrent skip list (by
* using michael-scott algorithm) instead of this simple version in a multi-thread
* environment, to achieve higher performance of read/write operations.
*
* Note: Duplicated primary key situation.
* In case of duplicated primary key, two ways can be employed to handle this situation:
* 1. add as normal insertion with out special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
* in the overflow pointer. In this case, the total steps of each search will be reduced significantly.
* Currently, we implement the skip list in a line with the first means, maybe refactor it soon.
*
* Memory consumption: the memory alignment causes many memory wasted. So, employ a memory
* pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs.
*
* 3. use the iterator pattern to refactor all routines to make it more clean
*/
// state struct, record following information:
// number of links in each level.
// avg search steps, for latest 1000 queries
// avg search rsp time, for latest 1000 queries
// total memory size
typedef struct tSkipListState {
// in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize
uint64_t nTotalMemSize;
uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL];
uint64_t queryCount; // total query count
/*
* only record latest 1000 queries
* when the value==1000, = 0,
* nTotalStepsForQueries = 0,
* nTotalElapsedTimeForQueries = 0
*/
uint64_t nRecQueries;
uint16_t nTotalStepsForQueries;
uint64_t nTotalElapsedTimeForQueries;
uint16_t nInsertObjs;
uint16_t nTotalStepsForInsert;
uint64_t nTotalElapsedTimeForInsert;
} tSkipListState;
typedef struct tSkipList {
tSkipListNode pHead;
uint64_t nSize;
uint16_t nMaxLevel;
uint16_t nLevel;
uint16_t keyType;
uint16_t nMaxKeyLen;
__compar_fn_t comparator;
pthread_rwlock_t lock; // will be removed soon
tSkipListState state; // skiplist state
} tSkipList;
/*
* iterate the skiplist
* this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may
* continue iterating the skiplist, so add the reference count for skiplist
* TODO add the ref for skiplist when one iterator is created
*/
typedef struct SSkipListIterator {
tSkipList * pSkipList;
tSkipListNode *cur;
int64_t num;
} SSkipListIterator;
/*
* query condition structure to denote the range query
* todo merge the point query cond with range query condition
*/
typedef struct tSKipListQueryCond {
// when the upper bounding == lower bounding, it is a point query
tSkipListKey lowerBnd;
tSkipListKey upperBnd;
int32_t lowerBndRelOptr; // relation operator to denote if lower bound is
int32_t upperBndRelOptr; // included or not
} tSKipListQueryCond;
tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen);
void *SSkipListDestroy(tSkipList *pSkipList);
// create skip list key
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength);
// destroy skip list key
void tSkipListDestroyKey(tSkipListKey *pKey);
// put data into skiplist
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey);
/*
* get only *one* node of which key is equalled to pKey, even there are more
* than one nodes are of the same key
*/
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey);
/*
* get all data with the same keys
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes);
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
void *param);
/*
* remove only one node of the pKey value.
* If more than one node has the same value, any one will be removed
*
* @Return
* true: one node has been removed
* false: no node has been removed
*/
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey);
/*
* remove the specified node in parameters
*/
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode);
// for debug purpose only
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel);
/*
* range query & single point query function
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult);
/*
* include/exclude point query
*/
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pResult);
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter);
bool tSkipListIteratorNext(SSkipListIterator *iter);
tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
#ifdef __cplusplus
}
#endif
#endif // TBASE_TSKIPLIST_H
...@@ -54,7 +54,7 @@ typedef struct SSkipListNode { ...@@ -54,7 +54,7 @@ typedef struct SSkipListNode {
#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level)) #define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n))) #define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
#define SL_GET_NODE_LEVEL(n) *(int32_t *)((n)) #define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n))
/* /*
* @version 0.3 * @version 0.3
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlog.h"
#include "taosdef.h"
#include "sskiplist.h"
#include "tutil.h"
static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level
for (int32_t i = 0; i < nLevel; ++i) {
pSkipList->state.nLevelNodeCnt[i]++;
}
}
static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) {
for (int32_t i = 0; i < nLevel; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
}
}
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) {
const uint32_t factor = 4;
int32_t n = 1;
while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) {
n++;
}
return n;
}
static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) {
int32_t nLevel = getSkipListNodeRandomHeight(pSkipList);
if (pSkipList->nSize == 0) {
nLevel = 1;
pSkipList->nLevel = 1;
} else {
if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) {
nLevel = (++pSkipList->nLevel);
}
}
return nLevel;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode);
void SSkipListDoRecordPut(tSkipList *pSkipList) {
const int32_t MAX_RECORD_NUM = 1000;
if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) {
pSkipList->state.nInsertObjs = 1;
pSkipList->state.nTotalStepsForInsert = 0;
pSkipList->state.nTotalElapsedTimeForInsert = 0;
} else {
pSkipList->state.nInsertObjs++;
}
}
int32_t compareIntVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
DEFAULT_COMP(lhs, rhs);
}
int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) {
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
double rhs = ((tSkipListKey *)pRight)->dKey;
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) {
double lhs = ((tSkipListKey *)pLeft)->dKey;
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t scompareDoubleVal(const void *pLeft, const void *pRight) {
double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey);
if (fabs(ret) < FLT_EPSILON) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
int32_t scompareStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
int32_t scompareWStrVal(const void *pLeft, const void *pRight) {
tSkipListKey *pL = (tSkipListKey *)pLeft;
tSkipListKey *pR = (tSkipListKey *)pRight;
if (pL->nLen == 0 && pR->nLen == 0) {
return 0;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if (pL->nLen == -1) {
return 1; // no lower bound, lower bound is minimum, always return -1;
} else if (pR->nLen == -1) {
return -1; // no upper bound, upper bound is maximum situation, always return 1;
}
int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) {
__compar_fn_t comparator = NULL;
switch (pSkipList->keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = scompareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparator = scompareDoubleIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = scompareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
comparator = scompareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = scompareWStrVal;
break;
default:
comparator = compareIntVal;
break;
}
return comparator;
}
static __compar_fn_t getKeyComparator(int32_t keyType) {
__compar_fn_t comparator = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL:
comparator = compareIntVal;
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
comparator = scompareDoubleVal;
break;
case TSDB_DATA_TYPE_BINARY:
comparator = scompareStrVal;
break;
case TSDB_DATA_TYPE_NCHAR:
comparator = scompareWStrVal;
break;
default:
comparator = compareIntVal;
break;
}
return comparator;
}
tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) {
tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList));
if (pSkipList == NULL) {
return NULL;
}
pSkipList->keyType = keyType;
pSkipList->comparator = getKeyComparator(keyType);
pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL);
pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL;
pSkipList->nLevel = 1;
pSkipList->nMaxKeyLen = nMaxKeyLen;
pSkipList->nMaxLevel = nMaxLevel;
if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) {
tfree(pSkipList->pHead.pForward);
tfree(pSkipList);
return NULL;
}
srand(time(NULL));
pSkipList->state.nTotalMemSize += sizeof(tSkipList);
return pSkipList;
}
static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) {
int32_t level = pNode->nLevel;
for (int32_t j = level - 1; j >= 0; --j) {
if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
}
if (forward[j]->pForward[j] != NULL) {
forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
}
}
pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2);
removeNodeEachLevel(pSkipList, pNode->nLevel);
tfree(pNode);
--pSkipList->nSize;
}
static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) {
size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1);
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
size += pKey->nLen + 1;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE;
}
return size;
}
static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) {
size_t nodeSize = getOneNodeSize(pKey, nLevel);
tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize);
pNode->pForward = (tSkipListNode **)(&pNode[1]);
pNode->pBackward = (pNode->pForward + nLevel);
pNode->pData = pData;
pNode->key = *pKey;
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
pNode->key.pz = (char *)(pNode->pBackward + nLevel);
strcpy(pNode->key.pz, pKey->pz);
pNode->key.pz[pKey->nLen] = 0;
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel);
wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen);
pNode->key.wpz[pKey->nLen] = 0;
}
pNode->nLevel = nLevel;
return pNode;
}
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) {
tSkipListKey k = {0};
tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type);
return k;
}
void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); }
void* SSkipListDestroy(tSkipList *pSkipList) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
while (pNode) {
tSkipListNode *pTemp = pNode;
pNode = pNode->pForward[0];
tfree(pTemp);
}
tfree(pSkipList->pHead.pForward);
pthread_rwlock_unlock(&pSkipList->lock);
pthread_rwlock_destroy(&pSkipList->lock);
tfree(pSkipList);
return NULL;
}
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) {
if (pSkipList == NULL) {
return NULL;
}
pthread_rwlock_wrlock(&pSkipList->lock);
// record one node is put into skiplist
SSkipListDoRecordPut(pSkipList);
tSkipListNode *px = &pSkipList->pHead;
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) {
px = px->pForward[i];
}
pSkipList->state.nTotalStepsForInsert++;
forward[i] = px;
}
// if the skiplist does not allowed identical key inserted, the new data will be discarded.
if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead &&
(pSkipList->comparator(&forward[0]->key, pKey) == 0)) {
pthread_rwlock_unlock(&pSkipList->lock);
return forward[0];
}
int32_t nLevel = getSkipListNodeLevel(pSkipList);
recordNodeEachLevel(pSkipList, nLevel);
tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel);
tSkipListDoInsert(pSkipList, forward, nLevel, pNode);
pSkipList->nSize += 1;
// char tmpstr[512] = {0};
// tVariantToString(&pNode->key, tmpstr);
// pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList,
// tmpstr, pSkipList->nSize);
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel);
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) {
for (int32_t i = 0; i < nLevel; ++i) {
tSkipListNode *x = forward[i];
if (x != NULL) {
pNode->pBackward[i] = x;
if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode;
pNode->pForward[i] = x->pForward[i];
x->pForward[i] = pNode;
} else {
pSkipList->pHead.pForward[i] = pNode;
pNode->pBackward[i] = &(pSkipList->pHead);
}
}
}
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
tSkipListNode *x = &pSkipList->pHead;
pthread_rwlock_rdlock(&pSkipList->lock);
pSkipList->state.queryCount++;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
if (ret == 0) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[i];
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey,
int32_t cond, tSkipListNode ***pRes) {
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *p = pStartNode;
int32_t numOfRes = 0;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType);
while (p != NULL) {
int32_t ret = filterComparator(&p->key, pEndKey);
if (ret > 0) {
break;
}
if (ret < 0) {
numOfRes++;
p = p->pForward[0];
} else if (ret == 0) {
if (cond == TSDB_RELATION_LESS_EQUAL) {
numOfRes++;
p = p->pForward[0];
} else {
break;
}
}
}
(*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes);
for (int32_t i = 0; i < numOfRes; ++i) {
(*pRes)[i] = pStartNode;
pStartNode = pStartNode->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return numOfRes;
}
/*
* maybe return the copy of tSkipListNode would be better
*/
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) {
(*pRes) = NULL;
tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey);
if (pNode == NULL) {
return 0;
}
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
// backward check if previous nodes are with the same value.
tSkipListNode *pPrev = pNode->pBackward[0];
while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) {
pPrev = pPrev->pBackward[0];
}
return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
}
static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) {
int32_t sLevel = pSkipList->nLevel - 1;
int32_t ret = -1;
tSkipListNode *x = &pSkipList->pHead;
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
pthread_rwlock_rdlock(&pSkipList->lock);
if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
for (int32_t i = sLevel; i >= 0; --i) {
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
x = x->pForward[i];
}
}
// backward check if previous nodes are with the same value.
if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
tSkipListNode *pNode = x->pForward[0];
while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) {
pNode = pNode->pBackward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pNode;
}
if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
pthread_rwlock_unlock(&pSkipList->lock);
return x->pForward[0];
} else { // cond == TSDB_RELATION_LARGE && ret == 0
tSkipListNode *pn = x->pForward[0];
while (pn != NULL && filterComparator(&pn->key, pKey) == 0) {
pn = pn->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return pn;
}
}
pthread_rwlock_unlock(&pSkipList->lock);
return NULL;
}
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
void *param) {
(*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
if (NULL == *pRes) {
pError("error skiplist %p, malloc failed", pSkipList);
return -1;
}
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
int32_t num = 0;
for (int32_t i = 0; i < pSkipList->nSize; ++i) {
if (pStartNode == NULL) {
pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
#ifdef _DEBUG_VIEW
SSkipListPrint(pSkipList, 1);
#endif
break;
}
if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
(*pRes)[num++] = pStartNode;
}
pStartNode = pStartNode->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
if (num == 0) {
free(*pRes);
*pRes = NULL;
} else if (num < pSkipList->nSize) { // free unused memory
char* tmp = realloc((*pRes), num * POINTER_BYTES);
assert(tmp != NULL);
*pRes = (tSkipListNode**)tmp;
}
return num;
}
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) {
if (pSkipList == NULL) {
return -1;
}
iter->pSkipList = pSkipList;
pthread_rwlock_rdlock(&pSkipList->lock);
iter->cur = NULL;//pSkipList->pHead.pForward[0];
iter->num = pSkipList->nSize;
pthread_rwlock_unlock(&pSkipList->lock);
return 0;
}
bool tSkipListIteratorNext(SSkipListIterator* iter) {
if (iter->num == 0 || iter->pSkipList == NULL) {
return false;
}
tSkipList* pSkipList = iter->pSkipList;
pthread_rwlock_rdlock(&pSkipList->lock);
if (iter->cur == NULL) {
iter->cur = pSkipList->pHead.pForward[0];
} else {
iter->cur = iter->cur->pForward[0];
}
pthread_rwlock_unlock(&pSkipList->lock);
return iter->cur != NULL;
}
tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) {
return iter->cur;
}
int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) {
pSkipList->state.queryCount++;
tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
if (pStart == 0) {
*pRes = NULL;
return 0;
}
return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
}
static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) {
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) {
tSkipListNode *p = forward[0]->pForward[0];
doRemove(pSkipList, p, forward);
} else { // failed to find the node of specified value,abort
return false;
}
// compress the minimum level of skip list
while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) {
pSkipList->nLevel -= 1;
}
return true;
}
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
pthread_rwlock_rdlock(&pSkipList->lock);
for (int32_t i = 0; i < pNode->nLevel; ++i) {
forward[i] = pNode->pBackward[i];
}
removeSupport(pSkipList, forward, &pNode->key);
pthread_rwlock_unlock(&pSkipList->lock);
}
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) {
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
pthread_rwlock_rdlock(&pSkipList->lock);
tSkipListNode *x = &pSkipList->pHead;
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) {
x = x->pForward[i];
}
forward[i] = x;
}
bool ret = removeSupport(pSkipList, forward, pKey);
pthread_rwlock_unlock(&pSkipList->lock);
return ret;
}
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) {
return;
}
tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1];
int32_t id = 1;
while (p) {
switch (pSkipList->keyType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key);
break;
case TSDB_DATA_TYPE_BINARY:
fprintf(stdout, "%d: %s \n", id++, p->key.pz);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, p->key.dKey);
break;
default:
fprintf(stdout, "\n");
}
p = p->pForward[nlevel - 1];
}
}
/*
* query processor based on query condition
*/
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) {
// query condition check
int32_t rel = 0;
__compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType);
if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 ||
(((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 &&
pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) {
(*pResult) = NULL;
return 0;
}
if (rel == 0) {
/*
* 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd
* point query
*/
if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL &&
pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query
return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult);
} else {
(*pResult) = NULL;
return 0;
}
} else {
/* range query, query operation code check */
return tSkipListRangeQuery(pSkipList, pQueryCond, pResult);
}
}
typedef struct MultipleQueryResult {
int32_t len;
tSkipListNode **pData;
} MultipleQueryResult;
static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) {
int32_t total = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
total += pResults[i].len;
}
(*pRes) = malloc(POINTER_BYTES * total);
int32_t idx = 0;
for (int32_t i = 0; i < numOfResSet; ++i) {
MultipleQueryResult *pOneResult = &pResults[i];
for (int32_t j = 0; j < pOneResult->len; ++j) {
(*pRes)[idx++] = pOneResult->pData[j];
}
}
return total;
}
static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) {
if (*numOfKey == 1) {
return;
}
qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator);
int32_t i = 0, j = 1;
while (i < (*numOfKey) && j < (*numOfKey)) {
int32_t ret = comparator(&pKey[i], &pKey[j]);
if (ret == 0) {
j++;
} else {
pKey[i + 1] = pKey[j];
i++;
j++;
}
}
(*numOfKey) = i + 1;
}
int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator,
tSkipListNode *pNode) {
int32_t i = 0, j = 0;
// merge two sorted arrays in O(n) time
while (i < numOfKey && pNode != NULL) {
int32_t ret = comparator(&pNode->key, &pKey[i]);
if (ret < 0) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
} else if (ret == 0) {
pNode = pNode->pForward[0];
} else { // pNode->key > pkey[i]
i++;
}
}
while (pNode != NULL) {
(*pRes)[j++] = pNode;
pNode = pNode->pForward[0];
}
return j;
}
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
tSkipListNode ***pRes) {
if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 ||
(type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) {
(*pRes) = NULL;
return 0;
}
__compar_fn_t comparator = getKeyComparator(pKey->nType);
removeDuplicateKey(pKey, &numOfKey, comparator);
if (type == INCLUDE_POINT_QUERY) {
if (numOfKey == 1) {
return tSkipListGets(pSkipList, &pKey[0], pRes);
} else {
MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey);
for (int32_t i = 0; i < numOfKey; ++i) {
pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData);
}
int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes);
for (int32_t i = 0; i < numOfKey; ++i) {
free(pTempResult[i].pData);
}
free(pTempResult);
return num;
}
} else { // exclude query
*pRes = malloc(POINTER_BYTES * pSkipList->nSize);
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode);
if (retLen < pSkipList->nSize) {
(*pRes) = realloc(*pRes, POINTER_BYTES * retLen);
}
return retLen;
}
}
...@@ -126,7 +126,7 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { ...@@ -126,7 +126,7 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) {
SHistogramInfo* pHisto = malloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1)); SHistogramInfo* pHisto = malloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1));
#if !defined(USE_ARRAYLIST) #if !defined(USE_ARRAYLIST)
pHisto->pList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); pHisto->pList = SSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
SInsertSupporter* pss = malloc(sizeof(SInsertSupporter)); SInsertSupporter* pss = malloc(sizeof(SInsertSupporter));
pss->numOfEntries = pHisto->maxEntries; pss->numOfEntries = pHisto->maxEntries;
pss->pSkipList = pHisto->pList; pss->pSkipList = pHisto->pList;
...@@ -185,7 +185,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -185,7 +185,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
SHistBin* entry = calloc(1, sizeof(SHistBin)); SHistBin* entry = calloc(1, sizeof(SHistBin));
entry->val = val; entry->val = val;
tSkipListNode* pResNode = tSkipListPut((*pHisto)->pList, entry, &key, 0); tSkipListNode* pResNode = SSkipListPut((*pHisto)->pList, entry, &key, 0);
SHistBin* pEntry1 = (SHistBin*)pResNode->pData; SHistBin* pEntry1 = (SHistBin*)pResNode->pData;
pEntry1->index = -1; pEntry1->index = -1;
...@@ -239,7 +239,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -239,7 +239,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
// set the right value for loser-tree // set the right value for loser-tree
assert((*pHisto)->pLoserTree != NULL); assert((*pHisto)->pLoserTree != NULL);
if (!(*pHisto)->ordered) { if (!(*pHisto)->ordered) {
tSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; SLoserTreeInfo* pTree = (*pHisto)->pLoserTree;
tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0];
...@@ -281,7 +281,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -281,7 +281,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
printf("delta is:%lf\n", pEntry1->delta); printf("delta is:%lf\n", pEntry1->delta);
tSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
/* the chosen node */ /* the chosen node */
tSkipListNode* pNode = (*pHisto)->pLoserTree->pNode[0].pData; tSkipListNode* pNode = (*pHisto)->pLoserTree->pNode[0].pData;
...@@ -319,7 +319,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { ...@@ -319,7 +319,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries);
// remove the next node in skiplist // remove the next node in skiplist
tSkipListRemoveNode((*pHisto)->pList, pNext); tSkipListRemoveNode((*pHisto)->pList, pNext);
tSkipListPrint((*pHisto)->pList, 1); SSkipListPrint((*pHisto)->pList, 1);
tLoserTreeDisplay((*pHisto)->pLoserTree); tLoserTreeDisplay((*pHisto)->pLoserTree);
} else { // add to heap } else { // add to heap
......
...@@ -6,4 +6,4 @@ add_subdirectory(tsdb) ...@@ -6,4 +6,4 @@ add_subdirectory(tsdb)
enable_testing() enable_testing()
add_subdirectory(tests) # add_subdirectory(tests)
\ No newline at end of file \ No newline at end of file
...@@ -12,10 +12,11 @@ ...@@ -12,10 +12,11 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if !defined(_TD_DATA_FORMAT_H_) #ifndef _TD_DATA_FORMAT_H_
#define _TD_DATA_FORMAT_H_ #define _TD_DATA_FORMAT_H_
#include <stdint.h> #include <stdint.h>
#include <stdlib.h>
#include "schema.h" #include "schema.h"
...@@ -24,23 +25,57 @@ extern "C" { ...@@ -24,23 +25,57 @@ extern "C" {
#endif #endif
// ----------------- Data row structure // ----------------- Data row structure
/* A data row, the format of it is like below: /* A data row, the format is like below:
* +---------+---------------------------------+ * +---------+---------------------------------+
* | int32_t | | * | int32_t | |
* +---------+---------------------------------+ * +---------+---------------------------------+
* | len | data | * | len | row |
* +---------+---------------------------------+ * +---------+---------------------------------+
* len: the length including sizeof(row) + sizeof(len)
* row: actual row data encoding
*/ */
typedef char* SDataRow; typedef void *SDataRow;
#define dataRowLen(r) (*(int32_t *)(r))
#define dataRowTuple(r) ((char *)(r) + sizeof(int32_t))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i)
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
SDataRow tdNewDataRow(int32_t bytes);
SDataRow tdNewDdataFromSchema(SSchema *pSchema);
void tdFreeDataRow(SDataRow row);
int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset);
void tdDataRowCpy(void *dst, SDataRow row);
void tdDataRowReset(SDataRow row);
SDataRow tdDataRowDup(SDataRow row);
/* Data rows definition, the format of it is like below: /* Data rows definition, the format of it is like below:
* +---------+---------+-----------------------+--------+-----------------------+ * +---------+-----------------------+--------+-----------------------+
* | int32_t | int32_t | | | | * | int32_t | | | |
* +---------+---------+-----------------------+--------+-----------------------+ * +---------+-----------------------+--------+-----------------------+
* | len | nrows | SDataRow | .... | SDataRow | * | len | SDataRow | .... | SDataRow |
* +---------+---------+-----------------------+--------+-----------------------+ * +---------+-----------------------+--------+-----------------------+
*/ */
typedef char * SDataRows; typedef void *SDataRows;
#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t)
#define dataRowsLen(rs) (*(int32_t *)(rs))
#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l))
#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t))
void tdDataRowsAppendRow(SDataRows rows, SDataRow row);
// Data rows iterator
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SDataRowsIter;
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter);
SDataRow tdDataRowsNext(SDataRowsIter *pIter);
/* Data column definition /* Data column definition
* +---------+---------+-----------------------+ * +---------+---------+-----------------------+
...@@ -49,7 +84,7 @@ typedef char * SDataRows; ...@@ -49,7 +84,7 @@ typedef char * SDataRows;
* | len | npoints | data | * | len | npoints | data |
* +---------+---------+-----------------------+ * +---------+---------+-----------------------+
*/ */
typedef char * SDataCol; typedef char *SDataCol;
/* Data columns definition /* Data columns definition
* +---------+---------+-----------------------+--------+-----------------------+ * +---------+---------+-----------------------+--------+-----------------------+
...@@ -58,38 +93,10 @@ typedef char * SDataCol; ...@@ -58,38 +93,10 @@ typedef char * SDataCol;
* | len | npoints | SDataCol | .... | SDataCol | * | len | npoints | SDataCol | .... | SDataCol |
* +---------+---------+-----------------------+--------+-----------------------+ * +---------+---------+-----------------------+--------+-----------------------+
*/ */
typedef char * SDataCols; typedef char *SDataCols;
// ----------------- Data column structure
// ---- operation on SDataRow;
#define TD_DATA_ROW_HEADER_SIZE sizeof(int32_t)
#define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow))
#define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t))
SDataRow tdSDataRowDup(SDataRow rdata);
// ---- operation on SDataRows
#define TD_DATAROWS_LEN(pDataRows) (*(int32_t *)(pDataRows))
#define TD_DATAROWS_ROWS(pDataRows) (*(int32_t *)(pDataRows + sizeof(int32_t)))
#define TD_NEXT_DATAROW(pDataRow) ((pDataRow) + TD_DATAROW_LEN(pDataRow))
// ---- operation on SDataCol
#define TD_DATACOL_LEN(pDataCol) (*(int32_t *)(pDataCol))
#define TD_DATACOL_NPOINTS(pDataCol) (*(int32_t *)(pDataCol + sizeof(int32_t)))
// ---- operation on SDataCols
#define TD_DATACOLS_LEN(pDataCols) (*(int32_t *)(pDataCols))
#define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t)))
// ----
/**
* Get the maximum
*/
int32_t tdGetMaxDataRowSize(SSchema *pSchema);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // _TD_DATA_FORMAT_H_ #endif // _TD_DATA_FORMAT_H_
#if !defined(_TD_KEY_H_)
#define _TD_KEY_H_
typedef struct {
} key;
#endif // _TD_KEY_H_
#if !defined(_TD_LIST_H_)
#define _TD_LIST_H_
#include <stdint.h>
typedef enum { TD_LIST_ORDERED, TD_LIST_UNORDERED } TLIST_TYPE;
typedef int32_t (* comparefn(void *key1, void *key2));
struct _list_type {
TLIST_TYPE type;
};
typedef struct _list_node {
} SListNode;
typedef struct _list {
} SList;
#endif // _TD_LIST_H_
#if !defined(_TD_SCHEMA_H_) #ifndef _TD_SCHEMA_H_
#define _TD_SCHEMA_H_ #define _TD_SCHEMA_H_
#include <stdint.h> #include <stdint.h>
...@@ -6,25 +6,50 @@ ...@@ -6,25 +6,50 @@
#include "type.h" #include "type.h"
// Column definition #ifdef __cplusplus
// TODO: if we need to align the structure extern "C" {
#endif
// ---- Column definition and operations
typedef struct { typedef struct {
td_datatype_t type; // Column type int8_t type; // Column type
int32_t colId; // column ID int16_t colId; // column ID
int32_t bytes; // column bytes int16_t bytes; // column bytes
int32_t offset; // point offset in a row data int32_t offset; // point offset in a row data
char * colName; // the column name
} SColumn; } SColumn;
// Schema definition #define colType(col) ((col)->type)
#define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t))
#define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o))
SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes);
void tdFreeCol(SColumn *pCol);
void tdColCpy(SColumn *dst, SColumn *src);
// ---- Schema definition and operations
typedef struct { typedef struct {
int32_t version; // schema version, it is used to change the schema
int32_t numOfCols; int32_t numOfCols;
int32_t numOfTags; int32_t padding; // TODO: replace the padding for useful variable
int32_t colIdCounter; SColumn columns[];
SColumn *columns;
} SSchema; } SSchema;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaColAt(s, i) ((s)->columns + i)
SSchema *tdNewSchema(int32_t nCols);
SSchema *tdDupSchema(SSchema *pSchema);
void tdFreeSchema(SSchema *pSchema);
void tdUpdateSchema(SSchema *pSchema);
int32_t tdMaxRowDataBytes(SSchema *pSchema);
// ---- Inline schema definition and operations
/* Inline schema definition /* Inline schema definition
* +---------+---------+---------+-----+---------+-----------+-----+-----------+ * +---------+---------+---------+-----+---------+-----------+-----+-----------+
* | int32_t | | | | | | | | * | int32_t | | | | | | | |
...@@ -34,41 +59,10 @@ typedef struct { ...@@ -34,41 +59,10 @@ typedef struct {
*/ */
typedef char *SISchema; typedef char *SISchema;
// TODO: decide if the space is allowed // TODO: add operations on SISchema
#define TD_ISCHEMA_HEADER_SIZE sizeof(int32_t) + sizeof(SSchema)
// ---- operations on SColumn
#define TD_COLUMN_TYPE(pCol) ((pCol)->type) // column type
#define TD_COLUMN_ID(pCol) ((pCol)->colId) // column ID
#define TD_COLUMN_BYTES(pCol) ((pCol)->bytes) // column bytes
#define TD_COLUMN_OFFSET(pCol) ((pCol)->offset) // column bytes
#define TD_COLUMN_NAME(pCol) ((pCol)->colName) // column name
#define TD_COLUMN_INLINE_SIZE(pCol) (sizeof(SColumn) + TD_COLUMN_NAME(pCol) + 1)
// ---- operations on SSchema
#define TD_SCHEMA_VERSION(pSchema) ((pSchema)->version) // schema version
#define TD_SCHEMA_NCOLS(pSchema) ((pSchema)->numOfCols) // schema number of columns
#define TD_SCHEMA_NTAGS(pSchema) ((pSchema)->numOfTags) // schema number of tags
#define TD_SCHEMA_TOTAL_COLS(pSchema) (TD_SCHEMA_NCOLS(pSchema) + TD_SCHEMA_NTAGS(pSchema)) // schema total number of SColumns (#columns + #tags)
#define TD_SCHEMA_NEXT_COLID(pSchema) ((pSchema)->colIdCounter++)
#define TD_SCHEMA_COLS(pSchema) ((pSchema)->columns)
#define TD_SCHEMA_TAGS(pSchema) (TD_SCHEMA_COLS(pSchema) + TD_SCHEMA_NCOLS(pSchema))
#define TD_SCHEMA_COLUMN_AT(pSchema, idx) (TD_SCHEMA_COLS(pSchema) + idx)
#define TD_SCHEMA_TAG_AT(pSchema, idx) (TD_SCHEMA_TAGS(pSchema) + idx)
// ---- operations on SISchema
#define TD_ISCHEMA_LEN(pISchema) *((int32_t *)(pISchema))
#define TD_ISCHEMA_SCHEMA(pISchema) ((SSchema *)((pISchema) + sizeof(int32_t)))
#define TD_ISCHEMA_COL_NAMES(pISchema) ((pISchema) + TD_ISCHEMA_HEADER_SIZE + sizeof(SColumn) * TD_SCHEMA_TOTAL_COLS(TD_ISCHEMA_SCHEMA(pISchema)))
// ----
/* Convert a schema structure to an inline schema structure
*/
SISchema tdConvertSchemaToInline(SSchema *pSchema);
int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName);
int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId);
SSchema *tdDupSchema(SSchema *pSchema);
// ---- TODO: operations to modify schema #ifdef __cplusplus
}
#endif
#endif // _TD_SCHEMA_H_ #endif // _TD_SCHEMA_H_
#if !defined(_TD_TYPE_H_) #ifndef _TD_TYPE_H_
#define _TD_TYPE_H_ #define _TD_TYPE_H_
#include <stdint.h> #include <stdint.h>
...@@ -11,6 +11,7 @@ typedef enum { ...@@ -11,6 +11,7 @@ typedef enum {
TD_DATATYPE_BIGINT, TD_DATATYPE_BIGINT,
TD_DATATYPE_FLOAT, TD_DATATYPE_FLOAT,
TD_DATATYPE_DOUBLE, TD_DATATYPE_DOUBLE,
TD_DATATYPE_TIMESTAMP,
TD_DATATYPE_VARCHAR, TD_DATATYPE_VARCHAR,
TD_DATATYPE_NCHAR, TD_DATATYPE_NCHAR,
TD_DATATYPE_BINARY TD_DATATYPE_BINARY
...@@ -26,6 +27,7 @@ extern const int32_t rowDataLen[]; ...@@ -26,6 +27,7 @@ extern const int32_t rowDataLen[];
#define TD_DATATYPE_BIGINT_NULL #define TD_DATATYPE_BIGINT_NULL
#define TD_DATATYPE_FLOAT_NULL #define TD_DATATYPE_FLOAT_NULL
#define TD_DATATYPE_DOUBLE_NULL #define TD_DATATYPE_DOUBLE_NULL
#define TD_DATATYPE_TIMESTAMP_NULL
#define TD_DATATYPE_VARCHAR_NULL #define TD_DATATYPE_VARCHAR_NULL
#define TD_DATATYPE_NCHAR_NULL #define TD_DATATYPE_NCHAR_NULL
#define TD_DATATYPE_BINARY_NULL #define TD_DATATYPE_BINARY_NULL
......
#include <stdlib.h>
#include "dataformat.h" #include "dataformat.h"
int32_t tdGetMaxDataRowSize(SSchema *pSchema) { /**
int32_t nbytes = 0; * Create a data row with maximum row length bytes.
*
for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { * NOTE: THE AAPLICATION SHOULD MAKE SURE BYTES IS LARGE ENOUGH TO
SColumn * pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); * HOLD THE WHOE ROW.
td_datatype_t type = TD_COLUMN_TYPE(pCol); *
* @param bytes max bytes a row can take
nbytes += rowDataLen[type]; * @return SDataRow object for success
* NULL for failure
switch (type) { */
case TD_DATATYPE_VARCHAR: SDataRow tdNewDataRow(int32_t bytes) {
nbytes += TD_COLUMN_BYTES(pCol); int32_t size = sizeof(int32_t) + bytes;
break;
case TD_DATATYPE_NCHAR: SDataRow row = malloc(size);
nbytes += 4 * TD_COLUMN_BYTES(pCol); if (row == NULL) return NULL;
break;
case TD_DATATYPE_BINARY: dataRowSetLen(row, sizeof(int32_t));
nbytes += TD_COLUMN_BYTES(pCol);
break; return row;
default: }
break;
} SDataRow tdNewDdataFromSchema(SSchema *pSchema) {
int32_t bytes = tdMaxRowDataBytes(pSchema);
return tdNewDataRow(bytes);
}
/**
* Free the SDataRow object
*/
void tdFreeDataRow(SDataRow row) {
if (row) free(row);
}
/**
* Append a column value to a SDataRow object.
* NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES
* THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE.
*
* @param row the row to append value to
* @param value value pointer to append
* @param pSchema schema
* @param colIdx column index
*
* @return 0 for success and -1 for failure
*/
int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) {
int32_t offset;
switch (pCol->type) {
case TD_DATATYPE_BOOL:
case TD_DATATYPE_TINYINT:
case TD_DATATYPE_SMALLINT:
case TD_DATATYPE_INT:
case TD_DATATYPE_BIGINT:
case TD_DATATYPE_FLOAT:
case TD_DATATYPE_DOUBLE:
case TD_DATATYPE_TIMESTAMP:
memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]);
if (dataRowLen(row) < suffixOffset + sizeof(int32_t))
dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]);
break;
case TD_DATATYPE_VARCHAR:
offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset;
memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset));
case TD_DATATYPE_NCHAR:
case TD_DATATYPE_BINARY:
break;
default:
return -1;
} }
nbytes += TD_DATA_ROW_HEADER_SIZE; return 0;
}
return nbytes; /**
* Copy a data row to a destination
* ASSUMPTIONS: dst has enough room for a copy of row
*/
void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); }
void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); }
SDataRow tdDataRowDup(SDataRow row) {
SDataRow trow = tdNewDataRow(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
return row;
}
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
} }
SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; } // Initialize the iterator
\ No newline at end of file void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) {
if (pIter == NULL) return;
pIter->totalLen = dataRowsLen(rows);
if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) {
pIter->row = NULL;
return;
}
pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN);
pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row);
}
// Get the next row in Rows
SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
SDataRow row = pIter->row;
if (row == NULL) return NULL;
if (pIter->len >= pIter->totalLen) {
pIter->row = NULL;
} else {
pIter->row = (char *)row + dataRowLen(row);
pIter->len += dataRowLen(row);
}
return row;
}
\ No newline at end of file
#include <stdlib.h> #include <stdlib.h>
#include "schema.h" #include "schema.h"
const int32_t rowDataLen[] = { const int32_t rowDataLen[] = {
sizeof(int8_t), // TD_DATATYPE_BOOL, sizeof(int8_t), // TD_DATATYPE_BOOL,
sizeof(int8_t), // TD_DATATYPE_TINYINT, sizeof(int8_t), // TD_DATATYPE_TINYINT,
...@@ -9,93 +10,127 @@ const int32_t rowDataLen[] = { ...@@ -9,93 +10,127 @@ const int32_t rowDataLen[] = {
sizeof(int64_t), // TD_DATATYPE_BIGINT, sizeof(int64_t), // TD_DATATYPE_BIGINT,
sizeof(float), // TD_DATATYPE_FLOAT, sizeof(float), // TD_DATATYPE_FLOAT,
sizeof(double), // TD_DATATYPE_DOUBLE, sizeof(double), // TD_DATATYPE_DOUBLE,
sizeof(int64_t), // TD_DATATYPE_TIMESTAMP
sizeof(int32_t), // TD_DATATYPE_VARCHAR, sizeof(int32_t), // TD_DATATYPE_VARCHAR,
sizeof(int32_t), // TD_DATATYPE_NCHAR, sizeof(int32_t), // TD_DATATYPE_NCHAR,
sizeof(int32_t) // TD_DATATYPE_BINARY sizeof(int32_t) // TD_DATATYPE_BINARY
}; };
static size_t tdGetEstimatedISchemaLen(SSchema *pSchema) { /**
size_t colNameLen = 0; * Create a new SColumn object
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { * ASSUMPTIONS: VALID PARAMETERS
colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1); *
} * @param type column type
* @param colId column ID
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) { * @param bytes maximum bytes the col taken
colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1); *
* @return a SColumn object on success
* NULL for failure
*/
SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) {
SColumn *pCol = (SColumn *)calloc(1, sizeof(SColumn));
if (pCol == NULL) return NULL;
colSetType(pCol, type);
colSetColId(pCol, colId);
switch (type) {
case TD_DATATYPE_VARCHAR:
case TD_DATATYPE_NCHAR:
case TD_DATATYPE_BINARY:
colSetBytes(pCol, bytes);
break;
default:
colSetBytes(pCol, rowDataLen[type]);
break;
} }
return TD_ISCHEMA_HEADER_SIZE + (size_t)TD_SCHEMA_TOTAL_COLS(pSchema) + colNameLen; return pCol;
} }
static void tdUpdateColumnOffsets(SSchema *pSchema) { /**
int32_t offset = 0; * Free a SColumn object CREATED with tdNewCol
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) */
{ void tdFreeCol(SColumn *pCol) {
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); if (pCol) free(pCol);
TD_COLUMN_OFFSET(pCol) = offset;
offset += rowDataLen[TD_COLUMN_TYPE(pCol)];
}
offset = 0;
for (size_t i = 0; i < TD_SCHEMA_NTAGS(pSchema); i++) {
SColumn *pCol = TD_SCHEMA_TAG_AT(pSchema, i);
TD_COLUMN_OFFSET(pCol) = offset;
offset += rowDataLen[TD_COLUMN_TYPE(pCol)];
}
} }
SISchema tdConvertSchemaToInline(SSchema *pSchema) { void tdColCpy(SColumn *dst, SColumn *src) { memcpy((void *)dst, (void *)src, sizeof(SColumn)); }
size_t len = tdGetEstimatedISchemaLen(pSchema);
int32_t totalCols = TD_SCHEMA_TOTAL_COLS(pSchema); /**
// TODO: if use pISchema is reasonable? * Create a SSchema object with nCols columns
SISchema pISchema = malloc(len); * ASSUMPTIONS: VALID PARAMETERS
if (pSchema == NULL) { *
// TODO: add error handling * @param nCols number of columns the schema has
return NULL; *
} * @return a SSchema object for success
* NULL for failure
TD_ISCHEMA_LEN(pISchema) = (int32_t)len; */
memcpy((void *)TD_ISCHEMA_SCHEMA(pISchema), (void *)pSchema, sizeof(SSchema)); SSchema *tdNewSchema(int32_t nCols) {
TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)) = (SColumn *)(pISchema + TD_ISCHEMA_HEADER_SIZE); int32_t size = sizeof(SSchema) + sizeof(SColumn) * nCols;
memcpy((void *)TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)), (void *)TD_SCHEMA_COLS(pSchema),
sizeof(SColumn) * totalCols); SSchema *pSchema = (SSchema *)calloc(1, size);
if (pSchema == NULL) return NULL;
pSchema->numOfCols = nCols;
return pSchema;
}
char *pName = TD_ISCHEMA_COL_NAMES(pISchema); /**
for (int32_t i = 0; i < totalCols; i++) { * Free the SSchema object created by tdNewSchema or tdDupSchema
SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i); */
char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i)); void tdFreeSchema(SSchema *pSchema) {
if (pSchema == NULL) free(pSchema);
}
TD_COLUMN_NAME(pCol) = pName; SSchema *tdDupSchema(SSchema *pSchema) {
SSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
if (tSchema == NULL) return NULL;
size_t tlen = strlen(colName) + 1; int32_t size = sizeof(SSchema) + sizeof(SColumn) * schemaNCols(pSchema);
memcpy((void *)pName, (void *)colName, tlen); memcpy((void *)tSchema, (void *)pSchema, size);
pName += tlen;
}
return pISchema; return tSchema;
} }
int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName) { /**
for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) { * Function to update each columns's offset field in the schema.
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); * ASSUMPTIONS: VALID PARAMETERS
if (strcmp(colName, TD_COLUMN_NAME(pCol)) == 0) { */
return i; void tdUpdateSchema(SSchema *pSchema) {
} SColumn *pCol = NULL;
int32_t offset = 0;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCol = schemaColAt(pSchema, i);
colSetOffset(pCol, offset);
offset += rowDataLen[pCol->type];
} }
return -1;
} }
int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId) { /**
for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) { * Get the maximum size of a row data with the schema
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i); */
if (TD_COLUMN_ID(pCol) == colId) { int32_t tdMaxRowDataBytes(SSchema *pSchema) {
return i; int32_t size = 0;
SColumn *pCol = NULL;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCol = schemaColAt(pSchema, i);
size += rowDataLen[pCol->type];
switch (pCol->type) {
case TD_DATATYPE_VARCHAR:
size += (pCol->bytes + 1); // TODO: remove literal here
break;
case TD_DATATYPE_NCHAR:
size += (pCol->bytes + 4); // TODO: check and remove literal here
break;
case TD_DATATYPE_BINARY:
size += pCol->bytes;
break;
default:
break;
} }
} }
return -1;
}
SSchema *tdDupSchema(SSchema *pSchema) { return size;
return NULL;
} }
\ No newline at end of file
add_subdirectory(common)
add_subdirectory(tsdb)
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
message(STATUS "COMMON: ${SOURCE_LIST}")
add_executable(commonTests ${SOURCE_LIST})
target_link_libraries(commonTests gtest gtest_main pthread common)
add_test(
NAME
unit
COMMAND
${CMAKE_CURRENT_BINARY_DIR}/commonTests
)
\ No newline at end of file
#include <gtest/gtest.h>
#include "dataformat.h"
TEST(commonDataTests, createDataRow) {
EXPECT_EQ(1, 2/2);
}
\ No newline at end of file
#include <gtest/gtest.h>
#include <stdlib.h>
#include <stdio.h>
#include "schema.h"
TEST(commonSchemaTests, createSchema) {
EXPECT_EQ(1, 2/2);
}
\ No newline at end of file
#include <gtest/gtest.h>
#include <stdlib.h>
#include "tsdb.h"
TEST(TsdbTest, createTsdbRepo) {
STsdbCfg config;
config.precision = TSDB_PRECISION_MILLI;
config.tsdbId = 0;
config.maxTables = 100;
config.daysPerFile = 10;
config.keep = 3650;
config.minRowsPerFileBlock = 100;
config.maxRowsPerFileBlock = 4096;
config.maxCacheSize = 4 * 1024 * 1024;
tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", &config, NULL);
ASSERT_NE(pRepo, nullptr);
tsdbCloseRepo(pRepo);
}
\ No newline at end of file
...@@ -5,6 +5,9 @@ target_link_libraries(tsdb common tutil) ...@@ -5,6 +5,9 @@ target_link_libraries(tsdb common tutil)
target_include_directories(tsdb target_include_directories(tsdb
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/src/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/src/util/inc" PUBLIC "${CMAKE_SOURCE_DIR}/src/util/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/src/os/linux/inc" PUBLIC "${CMAKE_SOURCE_DIR}/src/os/linux/inc"
) )
\ No newline at end of file
add_subdirectory(tests)
\ No newline at end of file
...@@ -46,8 +46,9 @@ typedef struct { ...@@ -46,8 +46,9 @@ typedef struct {
// Submit message for one table // Submit message for one table
typedef struct { typedef struct {
STableId tableId; STableId tableId;
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t numOfRows; // number of rows data int32_t len; // message length
char data[]; char data[];
} SSubmitBlock; } SSubmitBlock;
...@@ -98,6 +99,17 @@ typedef struct { ...@@ -98,6 +99,17 @@ typedef struct {
int64_t tableTotalDiskSize; // In bytes int64_t tableTotalDiskSize; // In bytes
} STableInfo; } STableInfo;
/**
* Create a configuration for TSDB default
* @return a pointer to a configuration. the configuration must call tsdbFreeCfg to free memory after usage
*/
STsdbCfg *tsdbCreateDefaultCfg();
/**
* Free
*/
void tsdbFreeCfg(STsdbCfg *pCfg);
/** /**
* Create a new TSDB repository * Create a new TSDB repository
* @param rootDir the TSDB repository root directory * @param rootDir the TSDB repository root directory
...@@ -171,7 +183,7 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); ...@@ -171,7 +183,7 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
* *
* @return 0 for success, -1 for failure and the error number is set * @return 0 for success, -1 for failure and the error number is set
*/ */
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid); int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
/** /**
* Get the information of a table in the repository * Get the information of a table in the repository
...@@ -187,13 +199,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid); ...@@ -187,13 +199,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
/** /**
* Insert data to a table in a repository * Insert data to a table in a repository
* @param pRepo the TSDB repository handle * @param pRepo the TSDB repository handle
* @param tid the table ID to insert to
* @param pData the data to insert (will give a more specific description) * @param pData the data to insert (will give a more specific description)
* @param error the error number to set when failure occurs
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData); int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
......
...@@ -44,7 +44,7 @@ typedef struct STSDBCache { ...@@ -44,7 +44,7 @@ typedef struct STSDBCache {
int32_t numOfBlocks; int32_t numOfBlocks;
STSDBCacheBlock *cacheList; STSDBCacheBlock *cacheList;
void * current; void * current;
} SCacheHandle; } STsdbCache;
// ---- Operation on STSDBCacheBlock // ---- Operation on STSDBCacheBlock
#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) #define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData)
...@@ -53,8 +53,9 @@ typedef struct STSDBCache { ...@@ -53,8 +53,9 @@ typedef struct STSDBCache {
#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next) #define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next)
#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev) #define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev)
SCacheHandle *tsdbCreateCache(int32_t numOfBlocks); STsdbCache *tsdbCreateCache(int32_t numOfBlocks);
int32_t tsdbFreeCache(SCacheHandle *pHandle); int32_t tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#include <pthread.h> #include <pthread.h>
#include "tsdb.h"
#include "dataformat.h" #include "dataformat.h"
#include "tskiplist.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -34,6 +36,8 @@ typedef enum { ...@@ -34,6 +36,8 @@ typedef enum {
TSDB_STABLE // table created from super table TSDB_STABLE // table created from super table
} TSDB_TABLE_TYPE; } TSDB_TABLE_TYPE;
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct STable { typedef struct STable {
STableId tableId; STableId tableId;
TSDB_TABLE_TYPE type; TSDB_TABLE_TYPE type;
...@@ -76,8 +80,9 @@ typedef struct STable { ...@@ -76,8 +80,9 @@ typedef struct STable {
typedef struct { typedef struct {
int32_t maxTables; int32_t maxTables;
int32_t nTables;
STable **tables; // array of normal tables STable **tables; // array of normal tables
STable * stables; // linked list of super tables STable * stables; // linked list of super tables // TODO use container to implement this
void * tableMap; // hash map of uid ==> STable * void * tableMap; // hash map of uid ==> STable *
} STsdbMeta; } STsdbMeta;
...@@ -105,11 +110,13 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables); ...@@ -105,11 +110,13 @@ STsdbMeta *tsdbCreateMeta(int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta); int32_t tsdbFreeMeta(STsdbMeta *pMeta);
// Recover the meta handle from the file // Recover the meta handle from the file
STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir); STsdbMeta *tsdbOpenMeta(char *tsdbDir);
int32_t tsdbCreateTableImpl(STsdbMeta *pHandle, STableCfg *pCfg);
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h> #include <stdlib.h>
#include "tsdbCache.h" #include "tsdbCache.h"
STsdbCache *tsdbCreateCache(int32_t numOfBlocks) {
STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache));
if (pCacheHandle == NULL) {
// TODO : deal with the error
return NULL;
}
SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) { return pCacheHandle;
SCacheHandle *pCacheHandle = (SCacheHandle *)malloc(sizeof(SCacheHandle)); }
if (pCacheHandle == NULL) {
// TODO : deal with the error
return NULL;
}
return pCacheHandle; int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; }
} void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) {
// TODO: implement here
void *ptr = malloc(bytes);
if (ptr == NULL) return NULL;
int32_t tsdbFreeCache(SCacheHandle *pHandle) { return 0; } return ptr;
\ No newline at end of file }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
......
#include <stdio.h> #include <dirent.h>
#include <fcntl.h> #include <fcntl.h>
#include <limits.h>
#include <pthread.h> #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <string.h>
#include <dirent.h>
// #include "taosdef.h" // #include "taosdef.h"
// #include "disk.h" // #include "disk.h"
...@@ -15,6 +16,31 @@ ...@@ -15,6 +16,31 @@
#include "tsdbCache.h" #include "tsdbCache.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
#include "tutil.h"
#include "tskiplist.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
#define TSDB_MIN_ID 0
#define TSDB_MAX_ID INT_MAX
#define TSDB_MIN_TABLES 10
#define TSDB_MAX_TABLES 100000
#define TSDB_DEFAULT_TABLES 1000
#define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 60
#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_DEFAULT_KEEP 3650
#define TSDB_MIN_KEEP 1
#define TSDB_MAX_KEEP INT_MAX
#define TSDB_DEFAULT_CACHE_SIZE (16 * 1024 * 1024) // 16M
#define TSDB_MIN_CACHE_SIZE (4 * 1024 * 1024) // 4M
#define TSDB_MAX_CACHE_SIZE (1024 * 1024 * 1024) // 1G
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
...@@ -27,7 +53,7 @@ typedef struct _tsdb_repo { ...@@ -27,7 +53,7 @@ typedef struct _tsdb_repo {
STsdbMeta *tsdbMeta; STsdbMeta *tsdbMeta;
// The cache Handle // The cache Handle
SCacheHandle *tsdbCache; STsdbCache *tsdbCache;
// Disk tier handle for multi-tier storage // Disk tier handle for multi-tier storage
void *diskTier; void *diskTier;
...@@ -49,17 +75,37 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); ...@@ -49,17 +75,37 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
#define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE) #define TSDB_IS_REPO_ACTIVE(pRepo) ((pRepo)->state == TSDB_REPO_STATE_ACTIVE)
#define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED) #define TSDB_IS_REPO_CLOSED(pRepo) ((pRepo)->state == TSDB_REPO_STATE_CLOSED)
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) { STsdbCfg *tsdbCreateDefaultCfg() {
STsdbCfg *pCfg = (STsdbCfg *)malloc(sizeof(STsdbCfg));
if (pCfg == NULL) return NULL;
pCfg->precision = -1;
pCfg->tsdbId = 0;
pCfg->maxTables = -1;
pCfg->daysPerFile = -1;
pCfg->minRowsPerFileBlock = -1;
pCfg->maxRowsPerFileBlock = -1;
pCfg->keep = -1;
pCfg->maxCacheSize = -1;
return pCfg;
}
void tsdbFreeCfg(STsdbCfg *pCfg) {
if (pCfg != NULL) free(pCfg);
}
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
if (rootDir == NULL) return NULL; if (rootDir == NULL) return NULL;
if (access(rootDir, F_OK|R_OK|W_OK) == -1) return NULL; if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) { if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) {
return NULL; return NULL;
...@@ -200,33 +246,98 @@ int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) { ...@@ -200,33 +246,98 @@ int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
return 0; return 0;
} }
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tid) { int32_t tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
return 0; // TODO
if (repo == NULL) return -1;
STsdbRepo *pRepo = (STsdbRepo *)repo;
return tsdbDropTableImpl(pRepo->tsdbMeta, tableId);
} }
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid) { STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
// TODO // TODO
return NULL; return NULL;
} }
int32_t tsdbInsertData(tsdb_repo_t *repo, STableId tableId, char *pData) { // TODO: need to return the number of data inserted
STsdbRepo *pRepo = (STsdbRepo *)repo; int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitBlock *pBlock = pMsg->data;
tsdbInsertDataImpl(pRepo->tsdbMeta, tableId, pData); for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
return -1;
}
pBlock = ((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len;
}
return 0; return 0;
} }
// Check the configuration and set default options // Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// TODO // Check precision
if (pCfg->precision == -1) {
pCfg->precision = TSDB_DEFAULT_PRECISION;
} else {
if (!IS_VALID_PRECISION(pCfg->precision)) return -1;
}
// Check tsdbId
if (pCfg->tsdbId < 0) return -1;
// Check MaxTables
if (pCfg->maxTables == -1) {
pCfg->maxTables = TSDB_DEFAULT_TABLES;
} else {
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1;
}
// Check daysPerFile
if (pCfg->daysPerFile == -1) {
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
} else {
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
}
// Check minRowsPerFileBlock and maxRowsPerFileBlock
if (pCfg->minRowsPerFileBlock == -1) {
pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
} else {
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK)
return -1;
}
if (pCfg->maxRowsPerFileBlock == -1) {
pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
} else {
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK)
return -1;
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1;
// Check keep
if (pCfg->keep == -1) {
pCfg->keep = TSDB_DEFAULT_KEEP;
} else {
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1;
}
// Check maxCacheSize
if (pCfg->maxCacheSize == -1) {
pCfg->maxCacheSize = TSDB_DEFAULT_CACHE_SIZE;
} else {
if (pCfg->maxCacheSize < TSDB_MIN_CACHE_SIZE || pCfg->maxCacheSize > TSDB_MAX_CACHE_SIZE) return -1;
}
return 0; return 0;
} }
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) { static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META); char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META);
int fd = open(metaFname, O_WRONLY|O_CREAT); int fd = open(metaFname, O_WRONLY | O_CREAT);
if (fd < 0) { if (fd < 0) {
return -1; return -1;
} }
...@@ -290,5 +401,50 @@ static int tsdbOpenMetaFile(char *tsdbDir) { ...@@ -290,5 +401,50 @@ static int tsdbOpenMetaFile(char *tsdbDir) {
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) { static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) {
// TODO: read tsdb configuration from file // TODO: read tsdb configuration from file
// recover tsdb meta // recover tsdb meta
return 0;
}
static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// TODO
int32_t level = 0;
int32_t headSize = 0;
tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize);
// Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row));
if (pNode == NULL) {
// TODO: deal with allocate failure
}
pNode->level = level;
tdDataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data
tsdbInsertRowToTableImpl(pNode, pTable);
return 0;
}
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
if (pTable == NULL) {
return -1;
}
SDataRows rows = pBlock->data;
SDataRowsIter rDataIter, *pIter;
pIter = &rDataIter;
SDataRow row;
tdInitSDataRowsIter(rows, pIter);
while ((row = tdDataRowsNext(pIter)) != NULL) {
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
// TODO: deal with the error here
}
}
return 0; return 0;
} }
\ No newline at end of file
#include <stdlib.h> #include <stdlib.h>
// #include "taosdef.h" // #include "taosdef.h"
#include "hash.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsdb.h" #include "tsdb.h"
#include "taosdef.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
#include "hash.h"
#include "tsdbCache.h"
#define TSDB_MIN_TABLES 10 #define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
#define TSDB_MAX_TABLES 100000
#define TSDB_DEFAULT_NSTABLES 10
#define IS_VALID_MAX_TABLES(maxTables) (((maxTables) >= TSDB_MIN_TABLES) && ((maxTables) <= TSDB_MAX_TABLES))
static int tsdbFreeTable(STable *pTable); static int tsdbFreeTable(STable *pTable);
static int32_t tsdbCheckTableCfg(STableCfg *pCfg); static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
static STable *tsdbGetTableByUid(int64_t uid); static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
STsdbMeta *tsdbCreateMeta(int32_t maxTables) { STsdbMeta *tsdbCreateMeta(int32_t maxTables) {
if (!IS_VALID_MAX_TABLES(maxTables)) return NULL;
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
return NULL; return NULL;
} }
pMeta->maxTables = maxTables; pMeta->maxTables = maxTables;
pMeta->nTables = 0;
pMeta->stables = NULL; pMeta->stables = NULL;
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
if (pMeta->tables == NULL) { if (pMeta->tables == NULL) {
...@@ -76,52 +73,60 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -76,52 +73,60 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
} }
STable *pSTable = NULL; STable *pSTable = NULL;
int newSuper = 0;
if (pCfg->stableUid > 0) { // to create a TSDB_STABLE if (IS_CREATE_STABLE(pCfg)) { // to create a TSDB_STABLE, check if super table exists
pSTable = tsdbGetTableByUid(pCfg->stableUid); pSTable = tsdbGetTableByUid(pMeta, pCfg->stableUid);
if (pSTable == NULL) { // super table not exists, try to create it if (pSTable == NULL) { // super table not exists, try to create it
newSuper = 1;
pSTable = (STable *)calloc(1, sizeof(STable)); pSTable = (STable *)calloc(1, sizeof(STable));
if (pSTable == NULL) return -1; if (pSTable == NULL) return -1;
pSTable->tableId.uid = pCfg->stableUid; pSTable->tableId.uid = pCfg->stableUid;
pSTable->tableId.tid = -1; pSTable->tableId.tid = -1;
pSTable->type = TSDB_SUPER_TABLE; pSTable->type = TSDB_SUPER_TABLE;
pSTable->createdTime = pCfg->createdTime; // The created time is not required // pSTable->createdTime = pCfg->createdTime; // The created time is not required
pSTable->stableUid = -1; pSTable->stableUid = -1;
pSTable->numOfCols = pCfg->numOfCols; pSTable->numOfCols = pCfg->numOfCols;
pSTable->pSchema = tdDupSchema(pCfg->schema); pSTable->pSchema = tdDupSchema(pCfg->schema);
// pSTable->content.pIndex = tSkipListCreate(5, 0, 10); // TODO: change here pSTable->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
tsdbAddTable(pMeta, pSTable); 0, NULL); // Allow duplicate key, no lock
if (pSTable->content.pIndex == NULL) {
free(pSTable);
return -1;
}
} else { } else {
if (pSTable->type != TSDB_SUPER_TABLE) return NULL; if (pSTable->type != TSDB_SUPER_TABLE) return -1;
} }
} }
STable *pTable = (STable *)malloc(sizeof(STable)); STable *pTable = (STable *)malloc(sizeof(STable));
if (pTable == NULL) { if (pTable == NULL) {
if (newSuper) tsdbFreeTable(pSTable);
return -1; return -1;
} }
pTable->tableId = pCfg->tableId; pTable->tableId = pCfg->tableId;
pTable->createdTime = pCfg->createdTime; pTable->createdTime = pCfg->createdTime;
if (1 /* */) { // TSDB_STABLE if (IS_CREATE_STABLE(pCfg)) { // TSDB_STABLE
pTable->type = TSDB_STABLE; pTable->type = TSDB_STABLE;
pTable->stableUid = pCfg->stableUid; pTable->stableUid = pCfg->stableUid;
pTable->pTagVal = tdSDataRowDup(pCfg->tagValues); pTable->pTagVal = tdDataRowDup(pCfg->tagValues);
} else { // TSDB_NTABLE } else { // TSDB_NTABLE
pTable->type = TSDB_NTABLE; pTable->type = TSDB_NTABLE;
pTable->stableUid = -1; pTable->stableUid = -1;
pTable->pSchema = tdDupSchema(pCfg->schema); pTable->pSchema = tdDupSchema(pCfg->schema);
} }
// pTable->content.pData = tSkipListCreate(5, 0, 10); // TODO: change here pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
tsdbAddTable(pMeta, pTable); if (newSuper) tsdbAddTableToMeta(pMeta, pSTable);
tsdbAddTableToMeta(pMeta, pTable);
return 0; return 0;
} }
STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) { STsdbMeta *tsdbOpenMeta(char *tsdbDir) {
// Open meta file for reading // TODO : Open meta file for reading
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
...@@ -131,29 +136,85 @@ STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) { ...@@ -131,29 +136,85 @@ STsdbMeta *tsdbOpenMetaHandle(char *tsdbDir) {
return pMeta; return pMeta;
} }
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, char *pData) { /**
STable *pTable = pMeta->tables[tableId.tid]; * Check if a table is valid to insert.
* @return NULL for invalid and the pointer to the table if valid
*/
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
if (pTable == NULL) { if (pTable == NULL) {
// TODO: deal with the error here return NULL;
return 0; }
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return NULL;
if (pTable->tableId.tid != tableId.tid) return NULL;
return pTable;
}
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
if (pMeta == NULL) return -1;
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
if (pTable == NULL) return -1;
if (pTable->type == TSDB_SUPER_TABLE) {
// TODO: implement drop super table
return -1;
} else {
pMeta->tables[pTable->tableId.tid] = NULL;
pMeta->nTables--;
assert(pMeta->nTables >= 0);
if (pTable->type == TSDB_STABLE) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
tsdbFreeTable(pTable);
}
return 0;
}
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
tSkipListPut(pTable->content.pData, pNode);
return 0;
}
static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function
if (pTable->type == TSDB_STABLE) {
tdFreeDataRow(pTable->pTagVal);
} else {
tdFreeSchema(pTable->pSchema);
} }
if (pTable->tableId.uid != tableId.uid) { // Free content
// TODO: deal with the error here if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
return 0; tSkipListDestroy(pTable->content.pIndex);
} else {
tSkipListDestroy(pTable->content.pData);
} }
free(pTable);
return 0; return 0;
} }
static int tsdbFreeTable(STable *pTable) { return 0; } static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
// TODO
return 0;
}
static int32_t tsdbCheckTableCfg(STableCfg *pCfg) { return 0; } STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
void *ptr = taosGetDataFromHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid));
static STable *tsdbGetTableByUid(int64_t uid) { return NULL; } if (ptr == NULL) return NULL;
static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) { return *(STable **)ptr;
if (pTable->type == TSDB_SUPER_TABLE) { }
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list
if (pMeta->stables == NULL) { if (pMeta->stables == NULL) {
pMeta->stables = pTable; pMeta->stables = pTable;
pTable->next = NULL; pTable->next = NULL;
...@@ -163,20 +224,39 @@ static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) { ...@@ -163,20 +224,39 @@ static int tsdbAddTable(STsdbMeta *pMeta, STable *pTable) {
pTable->next = pTemp; pTable->next = pTemp;
} }
} else { } else {
// add non-super table to the array
pMeta->tables[pTable->tableId.tid] = pTable; pMeta->tables[pTable->tableId.tid] = pTable;
if (pTable->type == TSDB_STABLE) { if (pTable->type == TSDB_STABLE) {
// add STABLE to the index
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
pMeta->nTables++;
} }
return tsdbAddTableIntoMap(pMeta, pTable); return tsdbAddTableIntoMap(pMeta, pTable);
} }
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
// TODO
return 0;
}
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
// TODO: add the table to the map // TODO: add the table to the map
int64_t uid = pTable->tableId.uid;
if (taosAddToHashTable(pMeta->tableMap, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
return -1;
}
return 0; return 0;
} }
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_STABLE);
// TODO
return 0;
}
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_STABLE);
// TODO // TODO
return 0; return 0;
} }
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
message(STATUS "TSDB: ${SOURCE_LIST}")
add_executable(tsdbTests ${SOURCE_LIST}) add_executable(tsdbTests ${SOURCE_LIST})
target_link_libraries(tsdbTests gtest gtest_main pthread tsdb) target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb)
add_test( add_test(
NAME NAME
......
#include <gtest/gtest.h>
#include <stdlib.h>
#include "tsdb.h"
#include "tsdbMeta.h"
TEST(TsdbTest, DISABLED_createTable) {
STsdbMeta *pMeta = tsdbCreateMeta(100);
ASSERT_NE(pMeta, nullptr);
STableCfg config;
config.tableId.tid = 0;
config.tableId.uid = 98868728187539L;
config.numOfCols = 5;
config.schema = tdNewSchema(config.numOfCols);
for (int i = 0; i < schemaNCols(config.schema); i++) {
SColumn *pCol = tdNewCol(TD_DATATYPE_BIGINT, i, 0);
tdColCpy(schemaColAt(config.schema, i), pCol);
tdFreeCol(pCol);
}
config.tagValues = nullptr;
tsdbCreateTableImpl(pMeta, &config);
STable *pTable = tsdbGetTableByUid(pMeta, config.tableId.uid);
ASSERT_NE(pTable, nullptr);
}
TEST(TsdbTest, createRepo) {
STsdbCfg *pCfg = tsdbCreateDefaultCfg();
// Create a tsdb repository
tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", pCfg, NULL);
ASSERT_NE(pRepo, nullptr);
tsdbFreeCfg(pCfg);
// create a normal table in this repository
STableCfg config;
config.tableId.tid = 0;
config.tableId.uid = 98868728187539L;
config.numOfCols = 5;
config.schema = tdNewSchema(config.numOfCols);
SColumn *pCol = tdNewCol(TD_DATATYPE_TIMESTAMP, 0, 0);
tdColCpy(schemaColAt(config.schema, 0), pCol);
tdFreeCol(pCol);
for (int i = 1; i < schemaNCols(config.schema); i++) {
pCol = tdNewCol(TD_DATATYPE_BIGINT, i, 0);
tdColCpy(schemaColAt(config.schema, i), pCol);
tdFreeCol(pCol);
}
tsdbCreateTable(pRepo, &config);
// Write some data
int32_t size = sizeof(SSubmitMsg) + sizeof(SSubmitBlock) + tdMaxRowDataBytes(config.schema) * 10 + sizeof(int32_t);
tdUpdateSchema(config.schema);
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(size);
pMsg->numOfTables = 1; // TODO: use api
SSubmitBlock *pBlock = (SSubmitBlock *)pMsg->data;
pBlock->tableId = {.uid = 98868728187539L, .tid = 0};
pBlock->sversion = 0;
pBlock->len = sizeof(SSubmitBlock);
SDataRows rows = pBlock->data;
dataRowsInit(rows);
SDataRow row = tdNewDataRow(tdMaxRowDataBytes(config.schema));
int64_t ttime = 1583508800000;
for (int i = 0; i < 10; i++) { // loop over rows
ttime += (10000 * i);
tdDataRowReset(row);
for (int j = 0; j < schemaNCols(config.schema); j++) {
if (j == 0) { // set time stamp
tdAppendColVal(row, (void *)(&ttime), schemaColAt(config.schema, j), 40);
} else { // set other fields
int32_t val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(config.schema, j), 40);
}
}
tdDataRowsAppendRow(rows, row);
}
tsdbInsertData(pRepo, pMsg);
tdFreeDataRow(row);
tdFreeSchema(config.schema);
tsdbDropRepo(pRepo);
}
\ No newline at end of file
#if !defined(_TD_WAL_H_) /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_WAL_H_
#define _TD_WAL_H_ #define _TD_WAL_H_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef void walh; // WAL HANDLE
#endif // _TD_WAL_H_ walh *vnodeOpenWal(int vnode, uint8_t op);
int vnodeCloseWal(walh *pWal);
int vnodeRenewWal(walh *pWal);
int vnodeWriteWal(walh *pWal, void *cont, int contLen);
int vnodeSyncWal(walh *pWal);
#ifdef __cplusplus
}
#endif
#endif // _TD_WAL_H_
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册