Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5e6ed4b4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5e6ed4b4
编写于
3月 30, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' into feature/2.0tsdb
上级
5b10eda6
fd6c41eb
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
722 addition
and
128 deletion
+722
-128
src/util/inc/tskiplist.h
src/util/inc/tskiplist.h
+24
-18
src/util/src/tskiplist.c
src/util/src/tskiplist.c
+13
-89
src/util/tests/cacheTest.cpp
src/util/tests/cacheTest.cpp
+2
-2
src/util/tests/skiplistTest.cpp
src/util/tests/skiplistTest.cpp
+314
-0
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+3
-0
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+8
-3
src/vnode/tsdb/src/tsdbMeta.c
src/vnode/tsdb/src/tsdbMeta.c
+2
-2
src/vnode/tsdb/src/tsdbRead.c
src/vnode/tsdb/src/tsdbRead.c
+354
-13
tests/examples/c/demo.c
tests/examples/c/demo.c
+2
-1
未找到文件。
src/util/inc/tskiplist.h
浏览文件 @
5e6ed4b4
...
@@ -51,7 +51,7 @@ typedef struct SSkipListNode {
...
@@ -51,7 +51,7 @@ typedef struct SSkipListNode {
#define SL_GET_BACKWARD_POINTER(n, _l) \
#define SL_GET_BACKWARD_POINTER(n, _l) \
((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)]
((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)]
#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_DATA(n) ((char
*)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n))
#define SL_GET_NODE_LEVEL(n) *(uint8_t *)((n))
...
@@ -106,25 +106,25 @@ typedef struct tSkipListState {
...
@@ -106,25 +106,25 @@ typedef struct tSkipListState {
typedef
struct
SSkipListKeyInfo
{
typedef
struct
SSkipListKeyInfo
{
uint8_t
dupKey
:
2
;
// if allow duplicated key in the skip list
uint8_t
dupKey
:
2
;
// if allow duplicated key in the skip list
uint8_t
type
:
6
;
// key type
uint8_t
type
:
4
;
// key type
uint8_t
freeNode
:
2
;
// free node when destroy the skiplist
uint8_t
len
;
// maximum key length, used in case of string key
uint8_t
len
;
// maximum key length, used in case of string key
}
SSkipListKeyInfo
;
}
SSkipListKeyInfo
;
typedef
struct
SSkipList
{
typedef
struct
SSkipList
{
__compar_fn_t
comparFn
;
__compar_fn_t
comparFn
;
__sl_key_fn_t
keyFn
;
__sl_key_fn_t
keyFn
;
uint32_t
size
;
uint32_t
size
;
uint8_t
maxLevel
;
uint8_t
maxLevel
;
uint8_t
level
;
uint8_t
level
;
SSkipListKeyInfo
keyInfo
;
SSkipListKeyInfo
keyInfo
;
pthread_rwlock_t
*
lock
;
pthread_rwlock_t
*
lock
;
SSkipListNode
*
pHead
;
SSkipListNode
*
pHead
;
// point to the first element
SSkipListNode
*
pTail
;
// point to the last element
void
*
lastKey
;
// last key in the skiplist
#if SKIP_LIST_RECORD_PERFORMANCE
#if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState
state
;
// skiplist state
tSkipListState
state
;
// skiplist state
#endif
#endif
}
SSkipList
;
}
SSkipList
;
/*
/*
...
@@ -147,7 +147,7 @@ typedef struct SSkipListIterator {
...
@@ -147,7 +147,7 @@ typedef struct SSkipListIterator {
* @return
* @return
*/
*/
SSkipList
*
tSkipListCreate
(
uint8_t
nMaxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
threadsafe
,
SSkipList
*
tSkipListCreate
(
uint8_t
nMaxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
threadsafe
,
__sl_key_fn_t
fn
);
uint8_t
freeNode
,
__sl_key_fn_t
fn
);
/**
/**
*
*
...
@@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
...
@@ -182,21 +182,28 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
* @param keyType
* @param keyType
* @return
* @return
*/
*/
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
,
int16_t
keyType
);
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
,
int16_t
keyType
);
/**
/**
* get the size of skip list
* get the size of skip list
* @param pSkipList
* @param pSkipList
* @return
* @return
*/
*/
size_t
tSkipListGetSize
(
const
SSkipList
*
pSkipList
);
size_t
tSkipListGetSize
(
const
SSkipList
*
pSkipList
);
/**
* display skip list of the given level, for debug purpose only
* @param pSkipList
* @param nlevel
*/
void
tSkipListPrint
(
SSkipList
*
pSkipList
,
int16_t
nlevel
);
/**
/**
* create skiplist iterator
* create skiplist iterator
* @param pSkipList
* @param pSkipList
* @return
* @return
*/
*/
SSkipListIterator
*
tSkipListCreateIter
(
SSkipList
*
pSkipList
);
SSkipListIterator
*
tSkipListCreateIter
(
SSkipList
*
pSkipList
);
/**
/**
* forward the skip list iterator
* forward the skip list iterator
...
@@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
...
@@ -217,7 +224,7 @@ SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
* @param iter
* @param iter
* @return
* @return
*/
*/
void
*
tSkipListDestroyIter
(
SSkipListIterator
*
iter
);
void
*
tSkipListDestroyIter
(
SSkipListIterator
*
iter
);
/*
/*
* remove only one node of the pKey value.
* remove only one node of the pKey value.
...
@@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
...
@@ -234,7 +241,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
*/
*/
void
tSkipListRemoveNode
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
void
tSkipListRemoveNode
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
src/util/src/tskiplist.c
浏览文件 @
5e6ed4b4
...
@@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
...
@@ -281,7 +281,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
}
}
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
lock
,
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
lock
,
__sl_key_fn_t
fn
)
{
uint8_t
freeNode
,
__sl_key_fn_t
fn
)
{
SSkipList
*
pSkipList
=
(
SSkipList
*
)
calloc
(
1
,
sizeof
(
SSkipList
));
SSkipList
*
pSkipList
=
(
SSkipList
*
)
calloc
(
1
,
sizeof
(
SSkipList
));
if
(
pSkipList
==
NULL
)
{
if
(
pSkipList
==
NULL
)
{
return
NULL
;
return
NULL
;
...
@@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui
...
@@ -291,9 +291,8 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint8_t keyLen, ui
maxLevel
=
MAX_SKIP_LIST_LEVEL
;
maxLevel
=
MAX_SKIP_LIST_LEVEL
;
}
}
pSkipList
->
keyInfo
=
(
SSkipListKeyInfo
){.
type
=
keyType
,
.
len
=
keyLen
,
.
dupKey
=
dupKey
};
pSkipList
->
keyInfo
=
(
SSkipListKeyInfo
){.
type
=
keyType
,
.
len
=
keyLen
,
.
dupKey
=
dupKey
,
.
freeNode
=
freeNode
};
pSkipList
->
keyFn
=
fn
;
pSkipList
->
keyFn
=
fn
;
pSkipList
->
comparFn
=
getKeyComparator
(
keyType
);
pSkipList
->
comparFn
=
getKeyComparator
(
keyType
);
pSkipList
->
maxLevel
=
maxLevel
;
pSkipList
->
maxLevel
=
maxLevel
;
pSkipList
->
level
=
1
;
pSkipList
->
level
=
1
;
...
@@ -348,12 +347,15 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
...
@@ -348,12 +347,15 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
pthread_rwlock_wrlock
(
pSkipList
->
lock
);
pthread_rwlock_wrlock
(
pSkipList
->
lock
);
}
}
SSkipListNode
*
pNode
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
0
);
// pSkipList->pHead.pForward[0];
SSkipListNode
*
pNode
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
0
);
while
(
pNode
)
{
while
(
pNode
)
{
SSkipListNode
*
pTemp
=
pNode
;
SSkipListNode
*
pTemp
=
pNode
;
pNode
=
SL_GET_FORWARD_POINTER
(
pNode
,
0
);
pNode
=
SL_GET_FORWARD_POINTER
(
pNode
,
0
);
tfree
(
pTemp
);
if
(
pSkipList
->
keyInfo
.
freeNode
)
{
tfree
(
pTemp
);
}
}
}
tfree
(
pSkipList
->
pHead
);
tfree
(
pSkipList
->
pHead
);
...
@@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
...
@@ -435,7 +437,11 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
recordNodeEachLevel
(
pSkipList
,
level
);
recordNodeEachLevel
(
pSkipList
,
level
);
#endif
#endif
// clear pointer area
int32_t
level
=
SL_GET_NODE_LEVEL
(
pNode
);
int32_t
level
=
SL_GET_NODE_LEVEL
(
pNode
);
memset
(
pNode
,
0
,
SL_NODE_HEADER_SIZE
(
pNode
->
level
));
pNode
->
level
=
level
;
tSkipListDoInsert
(
pSkipList
,
forward
,
level
,
pNode
);
tSkipListDoInsert
(
pSkipList
,
forward
,
level
,
pNode
);
atomic_add_fetch_32
(
&
pSkipList
->
size
,
1
);
atomic_add_fetch_32
(
&
pSkipList
->
size
,
1
);
...
@@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) {
...
@@ -691,89 +697,6 @@ void* tSkipListDestroyIter(SSkipListIterator* iter) {
// return NULL;
// return NULL;
//}
//}
//
//
// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
// void *param) {
// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
// if (NULL == *pRes) {
// pError("error skiplist %p, malloc failed", pSkipList);
// return -1;
// }
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
// int32_t num = 0;
//
// for (int32_t i = 0; i < pSkipList->nSize; ++i) {
// if (pStartNode == NULL) {
// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
//#ifdef _DEBUG_VIEW
// tSkipListPrint(pSkipList, 1);
//#endif
// break;
// }
//
// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
// (*pRes)[num++] = pStartNode;
// }
//
// pStartNode = pStartNode->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// if (num == 0) {
// free(*pRes);
// *pRes = NULL;
// } else if (num < pSkipList->nSize) { // free unused memory
// char *tmp = realloc((*pRes), num * POINTER_BYTES);
// assert(tmp != NULL);
//
// *pRes = (SSkipListNode **)tmp;
// }
//
// return num;
//}
//
// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) {
// if (pSkipList == NULL) {
// return -1;
// }
//
// iter->pSkipList = pSkipList;
// if (pSkipList->lock) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// }
// iter->cur = NULL; // pSkipList->pHead.pForward[0];
// iter->num = pSkipList->size;
//
// if (pSkipList->lock) {
// pthread_rwlock_unlock(&pSkipList->lock);
// }
//
// return 0;
//}
//
// bool tSkipListIteratorNext(SSkipListIterator *iter) {
// if (iter->num == 0 || iter->pSkipList == NULL) {
// return false;
// }
//
// SSkipList *pSkipList = iter->pSkipList;
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// if (iter->cur == NULL) {
// iter->cur = pSkipList->pHead.pForward[0];
// } else {
// iter->cur = iter->cur->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return iter->cur != NULL;
//}
//
// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; }
//
// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) {
// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) {
// pSkipList->state.queryCount++;
// pSkipList->state.queryCount++;
// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
...
@@ -841,7 +764,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
...
@@ -841,7 +764,8 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
}
}
SSkipListNode
*
p
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
nlevel
-
1
);
SSkipListNode
*
p
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
nlevel
-
1
);
int32_t
id
=
1
;
int32_t
id
=
1
;
while
(
p
)
{
while
(
p
)
{
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
p
);
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
p
);
...
...
src/util/tests/cacheTest.cpp
浏览文件 @
5e6ed4b4
...
@@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) {
...
@@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) {
}
}
uint64_t
endTime
=
taosGetTimestampUs
();
uint64_t
endTime
=
taosGetTimestampUs
();
printf
(
"add
10,000,000 object cost:%lld us, avg:%f us
\n
"
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
printf
(
"add
%d object cost:%lld us, avg:%f us
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
startTime
=
taosGetTimestampUs
();
startTime
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
...
@@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) {
...
@@ -134,7 +134,7 @@ TEST(testCase, cache_resize_test) {
assert
(
k
!=
0
);
assert
(
k
!=
0
);
}
}
endTime
=
taosGetTimestampUs
();
endTime
=
taosGetTimestampUs
();
printf
(
"retrieve
10,000,000 object cost:%lld us,avg:%f
\n
"
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
printf
(
"retrieve
%d object cost:%lld us,avg:%f
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
taosCacheCleanup
(
pCache
);
taosCacheCleanup
(
pCache
);
taosMsleep
(
20000
);
taosMsleep
(
20000
);
...
...
src/util/tests/skiplistTest.cpp
0 → 100644
浏览文件 @
5e6ed4b4
#include <gtest/gtest.h>
#include <limits.h>
#include <taosdef.h>
#include <iostream>
#include "taosmsg.h"
#include "tskiplist.h"
#include "ttime.h"
#include "tutil.h"
namespace
{
char
*
getkey
(
const
void
*
data
)
{
return
(
char
*
)(
data
);
}
void
doubleSkipListTest
()
{
SSkipList
*
pSkipList
=
tSkipListCreate
(
10
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
),
0
,
false
,
true
,
getkey
);
double
doubleVal
[
1000
]
=
{
0
};
int32_t
size
=
20000
;
printf
(
"generated %d keys is:
\n
"
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
if
(
i
<
1000
)
{
doubleVal
[
i
]
=
i
*
0.997
;
}
int32_t
level
=
0
;
int32_t
size
=
0
;
tSkipListRandNodeInfo
(
pSkipList
,
&
level
,
&
size
);
auto
d
=
(
SSkipListNode
*
)
calloc
(
1
,
size
+
sizeof
(
double
)
*
2
);
d
->
level
=
level
;
double
*
key
=
(
double
*
)
SL_GET_NODE_KEY
(
pSkipList
,
d
);
key
[
0
]
=
i
*
0.997
;
key
[
1
]
=
i
*
0.997
;
tSkipListPut
(
pSkipList
,
d
);
}
printf
(
"the first level of skip list is:
\n
"
);
tSkipListPrint
(
pSkipList
,
1
);
#if 0
SSkipListNode **pNodes = NULL;
SSkipListKey sk;
for (int32_t i = 0; i < 100; ++i) {
sk.nType = TSDB_DATA_TYPE_DOUBLE;
int32_t idx = abs((i * rand()) % 1000);
sk.dKey = doubleVal[idx];
int32_t size = tSkipListGets(pSkipList, &sk, &pNodes);
printf("the query result size is: %d\n", size);
for (int32_t j = 0; j < size; ++j) {
printf("the result is: %lf\n", pNodes[j]->key.dKey);
}
if (size > 0) {
tfree(pNodes);
}
}
#endif
printf
(
"double test end...
\n
"
);
tSkipListDestroy
(
pSkipList
);
}
void
stringKeySkiplistTest
()
{
const
int32_t
max_key_size
=
12
;
SSkipList
*
pSkipList
=
tSkipListCreate
(
10
,
TSDB_DATA_TYPE_BINARY
,
max_key_size
,
0
,
false
,
true
,
getkey
);
int32_t
level
=
0
;
int32_t
headsize
=
0
;
tSkipListRandNodeInfo
(
pSkipList
,
&
level
,
&
headsize
);
auto
pNode
=
(
SSkipListNode
*
)
calloc
(
1
,
headsize
+
max_key_size
+
sizeof
(
double
));
pNode
->
level
=
level
;
char
*
d
=
SL_GET_NODE_DATA
(
pNode
);
strncpy
(
d
,
"nyse"
,
5
);
*
(
double
*
)(
d
+
max_key_size
)
=
12
;
tSkipListPut
(
pSkipList
,
pNode
);
tSkipListRandNodeInfo
(
pSkipList
,
&
level
,
&
headsize
);
pNode
=
(
SSkipListNode
*
)
calloc
(
1
,
headsize
+
max_key_size
+
sizeof
(
double
));
pNode
->
level
=
level
;
d
=
SL_GET_NODE_DATA
(
pNode
);
strncpy
(
d
,
"beijing"
,
8
);
*
(
double
*
)(
d
+
max_key_size
)
=
911
;
tSkipListPut
(
pSkipList
,
pNode
);
#if 0
SSkipListNode **pRes = NULL;
int32_t ret = tSkipListGets(pSkipList, &key1, &pRes);
assert(ret == 1);
assert(strcmp(pRes[0]->key.pz, "beijing") == 0);
assert(pRes[0]->key.nType == TSDB_DATA_TYPE_BINARY);
tSkipListDestroyKey(&key1);
tSkipListDestroyKey(&key);
tSkipListDestroy(pSkipList);
free(pRes);
#endif
tSkipListDestroy
(
pSkipList
);
int64_t
s
=
taosGetTimestampUs
();
pSkipList
=
tSkipListCreate
(
10
,
TSDB_DATA_TYPE_BINARY
,
20
,
0
,
false
,
true
,
getkey
);
char
k
[
256
]
=
{
0
};
int32_t
total
=
10000
;
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
int32_t
n
=
sprintf
(
k
,
"abc_%d_%d"
,
i
,
i
);
tSkipListRandNodeInfo
(
pSkipList
,
&
level
,
&
headsize
);
auto
pNode
=
(
SSkipListNode
*
)
calloc
(
1
,
headsize
+
20
+
sizeof
(
double
));
pNode
->
level
=
level
;
char
*
d
=
SL_GET_NODE_DATA
(
pNode
);
strncpy
(
d
,
k
,
strlen
(
k
));
tSkipListPut
(
pSkipList
,
pNode
);
}
int64_t
e
=
taosGetTimestampUs
();
printf
(
"elapsed time:%lld us to insert %d data, avg:%f us
\n
"
,
(
e
-
s
),
total
,
(
double
)(
e
-
s
)
/
total
);
#if 0
SSkipListNode **pres = NULL;
s = taosGetTimestampMs();
for (int32_t j = 0; j < total; ++j) {
int32_t n = sprintf(k, "abc_%d_%d", j, j);
key = tSkipListCreateKey(TSDB_DATA_TYPE_BINARY, k, n);
int32_t num = tSkipListGets(pSkipList, &key, &pres);
assert(num > 0);
// tSkipListRemove(pSkipList, &key);
tSkipListRemoveNode(pSkipList, pres[0]);
if (num > 0) {
tfree(pres);
}
}
e = taosGetTimestampMs();
printf("elapsed time:%lldms\n", e - s);
#endif
tSkipListDestroy
(
pSkipList
);
}
void
skiplistPerformanceTest
()
{
SSkipList
*
pSkipList
=
tSkipListCreate
(
10
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
),
0
,
false
,
false
,
getkey
);
int32_t
size
=
900000
;
int64_t
prev
=
taosGetTimestampMs
();
int64_t
s
=
prev
;
int32_t
level
=
0
;
int32_t
headsize
=
0
;
int32_t
unit
=
MAX_SKIP_LIST_LEVEL
*
POINTER_BYTES
*
2
+
sizeof
(
double
)
*
2
+
sizeof
(
int16_t
);
char
*
total
=
(
char
*
)
calloc
(
1
,
unit
*
size
);
char
*
p
=
total
;
for
(
int32_t
i
=
size
;
i
>
0
;
--
i
)
{
tSkipListRandNodeInfo
(
pSkipList
,
&
level
,
&
headsize
);
SSkipListNode
*
d
=
(
SSkipListNode
*
)
p
;
p
+=
headsize
+
sizeof
(
double
)
*
2
;
d
->
level
=
level
;
double
*
v
=
(
double
*
)
SL_GET_NODE_DATA
(
d
);
v
[
0
]
=
i
*
0.997
;
v
[
1
]
=
i
*
0.997
;
tSkipListPut
(
pSkipList
,
d
);
if
(
i
%
100000
==
0
)
{
int64_t
cur
=
taosGetTimestampMs
();
int64_t
elapsed
=
cur
-
prev
;
printf
(
"add %d, elapsed time: %lld ms, avg elapsed:%f ms, total:%d
\n
"
,
100000
,
elapsed
,
elapsed
/
100000.0
,
i
);
prev
=
cur
;
}
}
int64_t
e
=
taosGetTimestampMs
();
printf
(
"total:%lld ms, avg:%f
\n
"
,
e
-
s
,
(
e
-
s
)
/
(
double
)
size
);
printf
(
"max level of skiplist:%d, actually level:%d
\n
"
,
pSkipList
->
maxLevel
,
pSkipList
->
level
);
assert
(
tSkipListGetSize
(
pSkipList
)
==
size
);
printf
(
"the level of skiplist is:
\n
"
);
// printf("level two------------------\n");
// tSkipListPrint(pSkipList, 2);
//
// printf("level three------------------\n");
// tSkipListPrint(pSkipList, 3);
//
// printf("level four------------------\n");
// tSkipListPrint(pSkipList, 4);
//
// printf("level nine------------------\n");
// tSkipListPrint(pSkipList, 10);
int64_t
st
=
taosGetTimestampMs
();
#if 0
for (int32_t i = 0; i < 100000; i += 1) {
key.dKey = i * 0.997;
tSkipListRemove(pSkipList, &key);
}
#endif
int64_t
et
=
taosGetTimestampMs
();
printf
(
"delete %d data from skiplist, elapased time:%"
PRIu64
"ms
\n
"
,
10000
,
et
-
st
);
assert
(
tSkipListGetSize
(
pSkipList
)
==
size
);
tSkipListDestroy
(
pSkipList
);
tfree
(
total
);
}
// todo not support duplicated key yet
void
duplicatedKeyTest
()
{
#if 0
SSkipListKey key;
key.nType = TSDB_DATA_TYPE_INT;
SSkipListNode **pNodes = NULL;
SSkipList *pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_INT, sizeof(int));
for (int32_t i = 0; i < 10000; ++i) {
for (int32_t j = 0; j < 5; ++j) {
key.i64Key = i;
tSkipListPut(pSkipList, "", &key, 1);
}
}
tSkipListPrint(pSkipList, 1);
for (int32_t i = 0; i < 100; ++i) {
key.i64Key = rand() % 1000;
int32_t size = tSkipListGets(pSkipList, &key, &pNodes);
assert(size == 5);
tfree(pNodes);
}
tSkipListDestroy(pSkipList);
#endif
}
}
// namespace
TEST
(
testCase
,
skiplist_test
)
{
assert
(
sizeof
(
SSkipListKey
)
==
8
);
srand
(
time
(
NULL
));
// stringKeySkiplistTest();
// doubleSkipListTest();
skiplistPerformanceTest
();
// duplicatedKeyTest();
// tSKipListQueryCond q;
// q.upperBndRelOptr = true;
// q.lowerBndRelOptr = true;
// q.upperBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q.lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q.lowerBnd.dKey = 120;
// q.upperBnd.dKey = 171.989;
/*
int32_t size = tSkipListQuery(pSkipList, &q, &pNodes);
for (int32_t i = 0; i < size; ++i) {
printf("-----%lf\n", pNodes[i]->key.dKey);
}
printf("the range query result size is: %d\n", size);
tfree(pNodes);
SSkipListKey *pKeys = malloc(sizeof(SSkipListKey) * 20);
for (int32_t i = 0; i < 8; i += 2) {
pKeys[i].dKey = i * 0.997;
pKeys[i].nType = TSDB_DATA_TYPE_DOUBLE;
printf("%lf ", pKeys[i].dKey);
}
int32_t r = tSkipListPointQuery(pSkipList, pKeys, 8, EXCLUDE_POINT_QUERY, &pNodes);
printf("\nthe exclude query result is: %d\n", r);
for (int32_t i = 0; i < r; ++i) {
// printf("%lf ", pNodes[i]->key.dKey);
}
tfree(pNodes);
free(pKeys);*/
getchar
();
}
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
5e6ed4b4
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include "dataformat.h"
#include "dataformat.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tglobalcfg.h"
#include "tsdb.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -148,6 +149,8 @@ typedef struct {
...
@@ -148,6 +149,8 @@ typedef struct {
SCompCol
cols
[];
SCompCol
cols
[];
}
SCompData
;
}
SCompData
;
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
5e6ed4b4
...
@@ -583,6 +583,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
...
@@ -583,6 +583,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
return
tsdb
->
tsdbMeta
;
return
tsdb
->
tsdbMeta
;
}
}
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbFileH
;
}
// Check the configuration and set default options
// Check the configuration and set default options
static
int32_t
tsdbCheckAndSetDefaultCfg
(
STsdbCfg
*
pCfg
)
{
static
int32_t
tsdbCheckAndSetDefaultCfg
(
STsdbCfg
*
pCfg
)
{
// Check precision
// Check precision
...
@@ -746,7 +751,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
...
@@ -746,7 +751,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if
(
pTable
->
mem
==
NULL
)
{
if
(
pTable
->
mem
==
NULL
)
{
pTable
->
mem
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
SMemTable
));
pTable
->
mem
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
SMemTable
));
if
(
pTable
->
mem
==
NULL
)
return
-
1
;
if
(
pTable
->
mem
==
NULL
)
return
-
1
;
pTable
->
mem
->
pData
=
tSkipListCreate
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
getTupleKey
);
pTable
->
mem
->
pData
=
tSkipListCreate
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
0
,
getTupleKey
);
pTable
->
mem
->
keyFirst
=
INT64_MAX
;
pTable
->
mem
->
keyFirst
=
INT64_MAX
;
pTable
->
mem
->
keyLast
=
0
;
pTable
->
mem
->
keyLast
=
0
;
}
}
...
@@ -769,7 +774,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
...
@@ -769,7 +774,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if
(
pTable
->
mem
==
NULL
)
{
if
(
pTable
->
mem
==
NULL
)
{
pTable
->
mem
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
SMemTable
));
pTable
->
mem
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
SMemTable
));
if
(
pTable
->
mem
==
NULL
)
return
-
1
;
if
(
pTable
->
mem
==
NULL
)
return
-
1
;
pTable
->
mem
->
pData
=
tSkipListCreate
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
getTupleKey
);
pTable
->
mem
->
pData
=
tSkipListCreate
(
5
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
0
,
0
,
0
,
getTupleKey
);
pTable
->
mem
->
keyFirst
=
INT64_MAX
;
pTable
->
mem
->
keyFirst
=
INT64_MAX
;
pTable
->
mem
->
keyLast
=
0
;
pTable
->
mem
->
keyLast
=
0
;
}
}
...
@@ -1189,7 +1194,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
...
@@ -1189,7 +1194,7 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
return
0
;
return
0
;
}
}
static
int
tsdbWriteBlockToFile
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
SCompInfo
*
pCompInfo
,
SDataCols
*
pCols
,
SCompBlock
*
pCompBlock
,
SFile
*
lFile
,
int64_t
uid
)
{
int
tsdbWriteBlockToFile
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pGroup
,
SCompIdx
*
pIdx
,
SCompInfo
*
pCompInfo
,
SDataCols
*
pCols
,
SCompBlock
*
pCompBlock
,
SFile
*
lFile
,
int64_t
uid
)
{
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
SCompData
*
pCompData
=
NULL
;
SCompData
*
pCompData
=
NULL
;
SFile
*
pFile
=
NULL
;
SFile
*
pFile
=
NULL
;
...
...
src/vnode/tsdb/src/tsdbMeta.c
浏览文件 @
5e6ed4b4
...
@@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
...
@@ -102,7 +102,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
pTable
->
pIndex
=
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
0
,
getTupleKey
);
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
0
,
0
,
getTupleKey
);
}
}
tsdbAddTableToMeta
(
pMeta
,
pTable
,
false
);
tsdbAddTableToMeta
(
pMeta
,
pTable
,
false
);
...
@@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
...
@@ -207,7 +207,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
super
->
tagSchema
=
tdDupSchema
(
pCfg
->
tagSchema
);
super
->
tagSchema
=
tdDupSchema
(
pCfg
->
tagSchema
);
super
->
tagVal
=
tdDataRowDup
(
pCfg
->
tagValues
);
super
->
tagVal
=
tdDataRowDup
(
pCfg
->
tagValues
);
super
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
super
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
sizeof
(
int64_t
),
1
,
0
,
getTupleKey
);
// Allow duplicate key, no lock
0
,
0
,
getTupleKey
);
// Allow duplicate key, no lock
if
(
super
->
pIndex
==
NULL
)
{
if
(
super
->
pIndex
==
NULL
)
{
tdFreeSchema
(
super
->
schema
);
tdFreeSchema
(
super
->
schema
);
...
...
src/vnode/tsdb/src/tsdbRead.c
浏览文件 @
5e6ed4b4
...
@@ -13,8 +13,9 @@
...
@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <tlog.h>
#include "os.h"
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "tutil.h"
#include "../../../query/inc/qast.h"
#include "../../../query/inc/qast.h"
...
@@ -28,6 +29,11 @@
...
@@ -28,6 +29,11 @@
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
enum
{
QUERY_RANGE_LESS_EQUAL
=
0
,
QUERY_RANGE_GREATER_EQUAL
=
1
,
};
typedef
struct
SField
{
typedef
struct
SField
{
// todo need the definition
// todo need the definition
}
SField
;
}
SField
;
...
@@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo {
...
@@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo {
int32_t
fileId
;
int32_t
fileId
;
}
SHeaderFileInfo
;
}
SHeaderFileInfo
;
typedef
struct
SQuery
Hand
lePos
{
typedef
struct
SQuery
Fi
lePos
{
int32_t
fi
leI
d
;
int32_t
fid
;
int32_t
slot
;
int32_t
slot
;
int32_t
pos
;
int32_t
pos
;
int
32_t
fileIndex
;
int
64_t
lastKey
;
}
SQuery
Hand
lePos
;
}
SQuery
Fi
lePos
;
typedef
struct
SDataBlockLoadInfo
{
typedef
struct
SDataBlockLoadInfo
{
int32_t
fileListIndex
;
int32_t
fileListIndex
;
...
@@ -78,8 +84,12 @@ typedef struct STableCheckInfo {
...
@@ -78,8 +84,12 @@ typedef struct STableCheckInfo {
TSKEY
lastKey
;
TSKEY
lastKey
;
STable
*
pTableObj
;
STable
*
pTableObj
;
int64_t
offsetInHeaderFile
;
int64_t
offsetInHeaderFile
;
int32_t
numOfBlocks
;
//
int32_t numOfBlocks;
int32_t
start
;
int32_t
start
;
bool
checkFirstFileBlock
;
SCompIdx
*
compIndex
;
SCompBlock
*
pBlock
;
SCompBlock
*
pBlock
;
SSkipListIterator
*
iter
;
SSkipListIterator
*
iter
;
}
STableCheckInfo
;
}
STableCheckInfo
;
...
@@ -104,8 +114,8 @@ enum {
...
@@ -104,8 +114,8 @@ enum {
typedef
struct
STsdbQueryHandle
{
typedef
struct
STsdbQueryHandle
{
struct
STsdbRepo
*
pTsdb
;
struct
STsdbRepo
*
pTsdb
;
int8_t
model
;
// access model, single table model or multi-table model
int8_t
model
;
// access model, single table model or multi-table model
SQuery
Hand
lePos
cur
;
// current position
SQuery
Fi
lePos
cur
;
// current position
SQuery
Hand
lePos
start
;
// the start position, used for secondary/third iteration
SQuery
Fi
lePos
start
;
// the start position, used for secondary/third iteration
int32_t
unzipBufSize
;
int32_t
unzipBufSize
;
char
*
unzipBuffer
;
char
*
unzipBuffer
;
char
*
secondaryUnzipBuffer
;
char
*
secondaryUnzipBuffer
;
...
@@ -342,6 +352,337 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) {
...
@@ -342,6 +352,337 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) {
return
true
;
return
true
;
}
}
// todo dynamic get the daysperfile
static
int32_t
getFileIdFromKey
(
TSKEY
key
)
{
return
(
int32_t
)(
key
/
10
);
// set the starting fileId
}
static
int32_t
getFileCompInfo
(
STableCheckInfo
*
pCheckInfo
,
SFileGroup
*
fileGroup
)
{
tsdbLoadCompIdx
(
fileGroup
,
pCheckInfo
->
compIndex
,
10000
);
// todo set dynamic max tables
SCompIdx
*
compIndex
=
&
pCheckInfo
->
compIndex
[
pCheckInfo
->
tableId
.
tid
];
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfSuperBlocks
==
0
)
{
// no data block in this file, try next file
}
else
{
tsdbLoadCompBlocks
(
fileGroup
,
compIndex
,
pCheckInfo
->
pBlock
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
binarySearchForBlockImpl
(
SCompBlock
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
)
{
int32_t
firstSlot
=
0
;
int32_t
lastSlot
=
numOfBlocks
-
1
;
int32_t
midSlot
=
firstSlot
;
while
(
1
)
{
numOfBlocks
=
lastSlot
-
firstSlot
+
1
;
midSlot
=
(
firstSlot
+
(
numOfBlocks
>>
1
));
if
(
numOfBlocks
==
1
)
break
;
if
(
skey
>
pBlock
[
midSlot
].
keyLast
)
{
if
(
numOfBlocks
==
2
)
break
;
if
((
order
==
TSQL_SO_DESC
)
&&
(
skey
<
pBlock
[
midSlot
+
1
].
keyFirst
))
break
;
firstSlot
=
midSlot
+
1
;
}
else
if
(
skey
<
pBlock
[
midSlot
].
keyFirst
)
{
if
((
order
==
TSQL_SO_ASC
)
&&
(
skey
>
pBlock
[
midSlot
-
1
].
keyLast
))
break
;
lastSlot
=
midSlot
-
1
;
}
else
{
break
;
// got the slot
}
}
return
midSlot
;
}
static
SDataBlockInfo
getTrueBlockInfo
(
STsdbQueryHandle
*
pHandle
,
STableCheckInfo
*
pCheckInfo
)
{
SDataBlockInfo
info
=
{{
0
},
0
};
SCompBlock
*
pDiskBlock
=
&
pCheckInfo
->
pBlock
[
pHandle
->
cur
.
slot
];
info
.
window
.
skey
=
pDiskBlock
->
keyFirst
;
info
.
window
.
ekey
=
pDiskBlock
->
keyLast
;
info
.
size
=
pDiskBlock
->
numOfPoints
;
info
.
numOfCols
=
pDiskBlock
->
numOfCols
;
return
info
;
}
bool
moveToNextBlock
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
step
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
if
(
pQueryHandle
->
cur
.
fid
>=
0
)
{
int32_t
fileIndex
=
-
1
;
/*
* 1. ascending order. The last data block of data file
* 2. descending order. The first block of file
*/
if
((
step
==
QUERY_ASC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
pQueryHandle
->
numOfBlocks
-
1
))
||
(
step
==
QUERY_DESC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
0
)))
{
// temporarily keep the position value, in case of no data qualified when move forwards(backwards)
SQueryFilePos
save
=
pQueryHandle
->
cur
;
// fileIndex = getNextDataFileCompInfo(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step);
// first data block in the next file
if
(
fileIndex
>=
0
)
{
cur
->
slot
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
pBlock
[
cur
->
slot
].
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// try data in cache
assert
(
cur
->
fid
==
-
1
);
if
(
step
==
QUERY_ASC_FORWARD_STEP
)
{
// TSKEY nextTimestamp =
// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true);
// if (nextTimestamp < 0) {
// pQueryHandle->cur = save;
// }
//
// return (nextTimestamp > 0);
}
// no data to check for desc order query, restore the saved position value
pQueryHandle
->
cur
=
save
;
return
false
;
}
}
// next block in the same file
int32_t
fid
=
cur
->
fid
;
// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo);
cur
->
slot
+=
step
;
SCompBlock
*
pBlock
=
&
pQueryHandle
->
pBlock
[
cur
->
slot
];
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pBlock
->
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// data in cache
return
hasMoreDataInCacheForSingleModel
(
pQueryHandle
);
}
}
int
vnodeBinarySearchKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
int
firstPos
,
lastPos
,
midPos
=
-
1
;
int
numOfPoints
;
TSKEY
*
keyList
;
if
(
num
<=
0
)
return
-
1
;
keyList
=
(
TSKEY
*
)
pValue
;
firstPos
=
0
;
lastPos
=
num
-
1
;
if
(
order
==
0
)
{
// find the first position which is smaller than the key
while
(
1
)
{
if
(
key
>=
keyList
[
lastPos
])
return
lastPos
;
if
(
key
==
keyList
[
firstPos
])
return
firstPos
;
if
(
key
<
keyList
[
firstPos
])
return
firstPos
-
1
;
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
else
{
// find the first position which is bigger than the key
while
(
1
)
{
if
(
key
<=
keyList
[
firstPos
])
return
firstPos
;
if
(
key
==
keyList
[
lastPos
])
return
lastPos
;
if
(
key
>
keyList
[
lastPos
])
{
lastPos
=
lastPos
+
1
;
if
(
lastPos
>=
num
)
return
-
1
;
else
return
lastPos
;
}
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
return
midPos
;
}
static
void
filterDataInDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SArray
*
sa
)
{
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
pQueryHandle
->
activeIndex
);
SDataBlockInfo
blockInfo
=
getTrueBlockInfo
(
pQueryHandle
,
pCheckInfo
);
int32_t
endPos
=
cur
->
pos
;
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
>
blockInfo
.
window
.
ekey
)
{
endPos
=
blockInfo
.
size
-
1
;
pQueryHandle
->
realNumOfRows
=
endPos
-
cur
->
pos
+
1
;
}
else
if
(
!
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
<
blockInfo
.
window
.
skey
)
{
endPos
=
0
;
pQueryHandle
->
realNumOfRows
=
cur
->
pos
+
1
;
}
else
{
// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size, pQueryHandle->window.ekey, pQueryHandle->order);
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
if
(
endPos
<
cur
->
pos
)
{
pQueryHandle
->
realNumOfRows
=
0
;
return
;
}
else
{
pQueryHandle
->
realNumOfRows
=
endPos
-
cur
->
pos
;
}
}
else
{
if
(
endPos
>
cur
->
pos
)
{
pQueryHandle
->
realNumOfRows
=
0
;
return
;
}
else
{
pQueryHandle
->
realNumOfRows
=
cur
->
pos
-
endPos
;
}
}
}
int32_t
start
=
MIN
(
cur
->
pos
,
endPos
);
// move the data block in the front to data block if needed
if
(
start
!=
0
)
{
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
sa
);
++
i
)
{
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
sa
,
i
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SColumnInfoEx
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
j
);
if
(
pCol
->
info
.
colId
==
colId
)
{
memmove
(
pCol
->
pData
,
((
char
*
)
pCol
->
pData
)
+
pCol
->
info
.
bytes
*
start
,
pQueryHandle
->
realNumOfRows
*
pCol
->
info
.
bytes
);
break
;
}
}
}
}
assert
(
pQueryHandle
->
realNumOfRows
<=
blockInfo
.
size
);
// forward(backward) the position for cursor
cur
->
pos
=
endPos
;
}
static
bool
getQualifiedDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
int32_t
type
)
{
STsdbFileH
*
pFileHandle
=
tsdbGetFile
(
pQueryHandle
->
pTsdb
);
int32_t
fid
=
getFileIdFromKey
(
pCheckInfo
->
lastKey
);
SFileGroup
*
fileGroup
=
tsdbSearchFGroup
(
pFileHandle
,
fid
);
if
(
fileGroup
==
NULL
)
{
return
false
;
}
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
TSKEY
key
=
pCheckInfo
->
lastKey
;
int32_t
index
=
-
1
;
// todo add iterator for filegroup
while
(
1
)
{
if
((
fid
=
getFileCompInfo
(
pCheckInfo
,
fileGroup
))
<
0
)
{
break
;
}
int32_t
tid
=
pCheckInfo
->
tableId
.
tid
;
index
=
binarySearchForBlockImpl
(
pCheckInfo
->
pBlock
,
pCheckInfo
->
compIndex
[
tid
].
numOfSuperBlocks
,
pQueryHandle
->
order
,
key
);
if
(
type
==
QUERY_RANGE_GREATER_EQUAL
)
{
if
(
key
<=
pCheckInfo
->
pBlock
[
index
].
keyLast
)
{
break
;
}
else
{
index
=
-
1
;
}
}
else
{
if
(
key
>=
pCheckInfo
->
pBlock
[
index
].
keyFirst
)
{
break
;
}
else
{
index
=
-
1
;
}
}
}
// failed to find qualified point in file, abort
if
(
index
==
-
1
)
{
return
false
;
}
assert
(
index
>=
0
&&
index
<
pQueryHandle
->
numOfBlocks
);
// load first data block into memory failed, caused by disk block error
bool
blockLoaded
=
false
;
SArray
*
sa
=
NULL
;
// todo no need to loaded at all
cur
->
slot
=
index
;
// sa = getDefaultLoadColumns(pQueryHandle, true);
if
(
tsdbLoadDataBlock
(
&
fileGroup
->
files
[
2
],
&
pCheckInfo
->
pBlock
[
cur
->
slot
],
1
,
fid
,
sa
)
==
0
)
{
blockLoaded
=
true
;
}
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
// failed to load data from disk, abort current query
if
(
blockLoaded
==
false
)
{
return
false
;
}
// todo search qualified points in blk, according to primary key (timestamp) column
// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order);
assert
(
cur
->
pos
>=
0
&&
cur
->
fid
>=
0
&&
cur
->
slot
>=
0
);
filterDataInDataBlock
(
pQueryHandle
,
sa
);
return
pQueryHandle
->
realNumOfRows
>
0
;
}
static
bool
hasMoreDataInFileForSingleTableModel
(
STsdbQueryHandle
*
pHandle
)
{
assert
(
pHandle
->
activeIndex
==
0
&&
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
)
==
1
);
STsdbFileH
*
pFileHandle
=
tsdbGetFile
(
pHandle
->
pTsdb
);
SQueryFilePos
*
cur
=
&
pHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
if
(
!
pCheckInfo
->
checkFirstFileBlock
)
{
pCheckInfo
->
checkFirstFileBlock
=
true
;
if
(
pFileHandle
!=
NULL
)
{
bool
found
=
getQualifiedDataBlock
(
pHandle
,
pCheckInfo
,
1
);
if
(
found
)
{
return
true
;
}
}
// no data in file, try cache
pHandle
->
cur
.
fid
=
-
1
;
return
hasMoreDataInCacheForSingleModel
(
pHandle
);
}
else
{
// move to next data block in file or in cache
return
moveToNextBlock
(
pHandle
,
1
);
}
}
static
bool
hasMoreDataInCacheForMultiModel
(
STsdbQueryHandle
*
pHandle
)
{
static
bool
hasMoreDataInCacheForMultiModel
(
STsdbQueryHandle
*
pHandle
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
);
size_t
numOfTables
=
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
);
assert
(
numOfTables
>
0
);
assert
(
numOfTables
>
0
);
...
@@ -372,7 +713,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
...
@@ -372,7 +713,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
)
{
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
if
(
pHandle
->
model
==
SINGLE_TABLE_MODEL
)
{
if
(
pHandle
->
model
==
SINGLE_TABLE_MODEL
)
{
return
hasMoreDataIn
CacheForSing
leModel
(
pHandle
);
return
hasMoreDataIn
FileForSingleTab
leModel
(
pHandle
);
}
else
{
}
else
{
return
hasMoreDataInCacheForMultiModel
(
pHandle
);
return
hasMoreDataInCacheForMultiModel
(
pHandle
);
}
}
...
@@ -704,8 +1045,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
...
@@ -704,8 +1045,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
STable
*
pTable
=
(
STable
*
)(
SL_GET_NODE_DATA
((
SSkipListNode
*
)
pNode
));
STable
*
pTable
=
(
STable
*
)(
SL_GET_NODE_DATA
((
SSkipListNode
*
)
pNode
));
char
buf
[
TSDB_MAX_TAGS_LEN
]
=
{
0
};
char
*
val
=
dataRowTuple
(
pTable
->
tagVal
);
// todo not only the first column
char
*
val
=
dataRowTuple
(
pTable
->
tagVal
);
// todo not only the first column
int8_t
type
=
pInfo
->
sch
.
type
;
int8_t
type
=
pInfo
->
sch
.
type
;
...
@@ -765,9 +1104,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
...
@@ -765,9 +1104,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
// query according to the binary expression
// query according to the binary expression
SSyntaxTreeFilterSupporter
s
=
{.
pTagSchema
=
stcol
,
.
numOfTags
=
schemaNCols
(
pSTable
->
tagSchema
)};
SSyntaxTreeFilterSupporter
s
=
{.
pTagSchema
=
stcol
,
.
numOfTags
=
schemaNCols
(
pSTable
->
tagSchema
)};
SBinaryFilterSupp
supp
=
{.
fp
=
(
__result_filter_fn_t
)
tSkipListNodeFilterCallback
,
SBinaryFilterSupp
supp
=
{
.
fp
=
(
__result_filter_fn_t
)
tSkipListNodeFilterCallback
,
.
setupInfoFn
=
(
__do_filter_suppl_fn_t
)
filterPrepare
,
.
setupInfoFn
=
(
__do_filter_suppl_fn_t
)
filterPrepare
,
.
pExtInfo
=
&
s
};
.
pExtInfo
=
&
s
};
tSQLBinaryExprTraverse
(
pExpr
,
pSTable
->
pIndex
,
pRes
,
&
supp
);
tSQLBinaryExprTraverse
(
pExpr
,
pSTable
->
pIndex
,
pRes
,
&
supp
);
tSQLBinaryExprDestroy
(
&
pExpr
,
tSQLListTraverseDestroyInfo
);
tSQLBinaryExprDestroy
(
&
pExpr
,
tSQLListTraverseDestroyInfo
);
...
...
tests/examples/c/demo.c
浏览文件 @
5e6ed4b4
...
@@ -46,7 +46,8 @@ int main(int argc, char *argv[]) {
...
@@ -46,7 +46,8 @@ int main(int argc, char *argv[]) {
}
}
printf
(
"success to connect to server
\n
"
);
printf
(
"success to connect to server
\n
"
);
int32_t
code
=
taos_query
(
taos
,
"select * from test.t1"
);
// int32_t code = taos_query(taos, "insert into test.tm2 values(now, 1)(now+1m,2)(now+2m,3) (now+3m, 4) (now+4m, 5);");
int32_t
code
=
taos_query
(
taos
,
"insert into test.tm2 values(now, 99)"
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
printf
(
"failed to execute query, reason:%s
\n
"
,
taos_errstr
(
taos
));
printf
(
"failed to execute query, reason:%s
\n
"
,
taos_errstr
(
taos
));
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录