Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
576f7703
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
576f7703
编写于
3月 04, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more
上级
5ffd4207
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
882 addition
and
700 deletion
+882
-700
src/util/inc/tarray.h
src/util/inc/tarray.h
+91
-0
src/util/inc/tskiplist.h
src/util/inc/tskiplist.h
+109
-93
src/util/src/tarray.c
src/util/src/tarray.c
+117
-0
src/util/src/tskiplist.c
src/util/src/tskiplist.c
+565
-607
未找到文件。
src/util/inc/tarray.h
0 → 100644
浏览文件 @
576f7703
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAOSARRAY_H
#define TDENGINE_TAOSARRAY_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
#define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((array)->pData + (index) * (array)->elemSize)
typedef
struct
SArray
{
size_t
size
;
size_t
capacity
;
size_t
elemSize
;
void
*
pData
;
}
SArray
;
/**
*
* @param size
* @param elemSize
* @return
*/
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
);
/**
*
* @param pArray
* @param pData
* @return
*/
void
*
taosArrayPush
(
SArray
*
pArray
,
void
*
pData
);
/**
*
* @param pArray
*/
void
taosArrayPop
(
SArray
*
pArray
);
/**
*
* @param pArray
* @param index
* @return
*/
void
*
taosArrayGet
(
SArray
*
pArray
,
size_t
index
);
/**
*
* @param pArray
* @return
*/
size_t
taosArrayGetSize
(
SArray
*
pArray
);
/**
*
* @param pArray
* @param index
* @param pData
*/
void
taosArrayInsert
(
SArray
*
pArray
,
int32_t
index
,
void
*
pData
);
/**
*
* @param pArray
*/
void
taosArrayDestory
(
SArray
*
pArray
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TAOSARRAY_H
src/util/inc/tskiplist.h
浏览文件 @
576f7703
...
...
@@ -20,59 +20,62 @@
extern
"C"
{
#endif
#define MAX_SKIP_LIST_LEVEL 20
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include "os.h"
#include "ttypes.h"
#include "tarray.h"
/*
* key of each node
* todo move to as the global structure in all search codes...
*/
#define MAX_SKIP_LIST_LEVEL 15
#define SKIP_LIST_RECORD_PERFORMANCE 0
typedef
char
*
SSkipListKey
;
typedef
char
*
(
*
__sl_key_fn_t
)(
const
void
*
);
/**
* the format of skip list node is as follows:
* +------------+-----------------------+------------------------+-----+------+
* | node level | forward pointer array | backward pointer array | key | data |
* +------------+-----------------------+------------------------+-----+------+
* the skiplist node is located in a consecutive memory area, key will not be copy to skip list
*/
typedef
struct
SSkipListNode
{
uint8_t
level
;
}
SSkipListNode
;
const
static
size_t
SKIP_LIST_STR_KEY_LENGTH_THRESHOLD
=
15
;
typedef
tVariant
tSkipListKey
;
typedef
enum
tSkipListPointQueryType
{
INCLUDE_POINT_QUERY
,
EXCLUDE_POINT_QUERY
,
}
tSkipListPointQueryType
;
#define SL_NODE_HEADER_SIZE(_l) (sizeof(SSkipListNode) + ((_l) << 1u) * POINTER_BYTES)
typedef
struct
tSkipListNode
{
uint16_t
nLevel
;
char
*
pData
;
#define SL_GET_FORWARD_POINTER(n, _l) ((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode)))[(_l)]
#define SL_GET_BACKWARD_POINTER(n, _l) \
((SSkipListNode **)((char *)(n) + sizeof(SSkipListNode) + ((n)->level) * POINTER_BYTES))[(_l)]
struct
tSkipListNode
**
pForward
;
struct
tSkipListNode
**
pBackward
;
#define SL_GET_NODE_DATA(n) ((char*)(n) + SL_NODE_HEADER_SIZE((n)->level))
#define SL_GET_NODE_KEY(s, n) ((s)->keyFn(SL_GET_NODE_DATA(n)))
tSkipListKey
key
;
}
tSkipListNode
;
#define SL_GET_NODE_LEVEL(n) *(int32_t *)((n))
/*
* @version 0.
2
* @version 0.
3
* @date 2017/11/12
* the simple version of SkipList.
* the simple version of skip list.
*
* for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate
* deterministic result. Later, we will remove the lock in SkipList to further
* enhance the performance. In this case, one should use the concurrent skip list (by
* using michael-scott algorithm) instead of this simple version in a multi-thread
* environment, to achieve higher performance of read/write operations.
* deterministic result. Later, we will remove the lock in SkipList to further enhance the performance.
* In this case, one should use the concurrent skip list (by using michael-scott algorithm) instead of
* this simple version in a multi-thread environment, to achieve higher performance of read/write operations.
*
* Note: Duplicated primary key situation.
* In case of duplicated primary key, two ways can be employed to handle this situation:
* 1. add as normal insertion with
out special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
*
in the overflow pointer.
In this case, the total steps of each search will be reduced significantly.
* 1. add as normal insertion without special process.
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
in the overflow pointer.
* In this case, the total steps of each search will be reduced significantly.
* Currently, we implement the skip list in a line with the first means, maybe refactor it soon.
*
* Memory consumption: the memory alignment causes many memory wasted. So, employ a memory
* pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs.
*
* 3. use the iterator pattern to refactor all routines to make it more clean
*/
// state struct, record following information:
...
...
@@ -81,7 +84,7 @@ typedef struct tSkipListNode {
// avg search rsp time, for latest 1000 queries
// total memory size
typedef
struct
tSkipListState
{
// in bytes, sizeof(
tSkipList)+sizeof(tSkipListNode)*t
SkipList->nSize
// in bytes, sizeof(
SSkipList)+sizeof(SSkipListNode)*S
SkipList->nSize
uint64_t
nTotalMemSize
;
uint64_t
nLevelNodeCnt
[
MAX_SKIP_LIST_LEVEL
];
uint64_t
queryCount
;
// total query count
...
...
@@ -101,68 +104,95 @@ typedef struct tSkipListState {
uint64_t
nTotalElapsedTimeForInsert
;
}
tSkipListState
;
typedef
struct
tSkipList
{
tSkipListNode
pHead
;
uint64_t
nSize
;
uint16_t
nMaxLevel
;
uint16_t
nLevel
;
uint16_t
keyType
;
uint16_t
nMaxKeyLen
;
typedef
struct
SSkipListKeyInfo
{
uint8_t
dupKey
:
2
;
// if allow duplicated key in the skip list
uint8_t
type
:
6
;
// key type
uint8_t
len
;
// maximum key length, used in case of string key
}
SSkipListKeyInfo
;
typedef
struct
SSkipList
{
__compar_fn_t
comparFn
;
__sl_key_fn_t
keyFn
;
uint32_t
size
;
uint8_t
maxLevel
;
uint8_t
level
;
SSkipListKeyInfo
keyInfo
;
pthread_rwlock_t
*
lock
;
SSkipListNode
*
pHead
;
#if SKIP_LIST_RECORD_PERFORMANCE
tSkipListState
state
;
// skiplist state
#endif
__compar_fn_t
comparator
;
pthread_rwlock_t
lock
;
// will be removed soon
tSkipListState
state
;
// skiplist state
}
tSkipList
;
}
SSkipList
;
/*
* iterate the skiplist
* this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may
* continue iterating the skiplist, so add the reference count for skiplist
* TODO add the ref for skiplist when one iterator is created
* TODO add the ref for skip
list when one iterator is created
*/
typedef
struct
SSkipListIterator
{
t
SkipList
*
pSkipList
;
t
SkipListNode
*
cur
;
S
SkipList
*
pSkipList
;
S
SkipListNode
*
cur
;
int64_t
num
;
}
SSkipListIterator
;
/*
* query condition structure to denote the range query
* todo merge the point query cond with range query condition
/**
*
* @param nMaxLevel maximum skip list level
* @param keyType type of key
* @param dupKey allow the duplicated key in the skip list
* @return
*/
typedef
struct
tSKipListQueryCond
{
// when the upper bounding == lower bounding, it is a point query
tSkipListKey
lowerBnd
;
tSkipListKey
upperBnd
;
int32_t
lowerBndRelOptr
;
// relation operator to denote if lower bound is
int32_t
upperBndRelOptr
;
// included or not
}
tSKipListQueryCond
;
tSkipList
*
tSkipListCreate
(
int16_t
nMaxLevel
,
int16_t
keyType
,
int16_t
nMaxKeyLen
);
void
*
tSkipListDestroy
(
tSkipList
*
pSkipList
);
// create skip list key
tSkipListKey
tSkipListCreateKey
(
int32_t
type
,
char
*
val
,
size_t
keyLength
);
SSkipList
*
tSkipListCreate
(
uint8_t
nMaxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
threadsafe
,
__sl_key_fn_t
fn
);
// destroy skip list key
void
tSkipListDestroyKey
(
tSkipListKey
*
pKey
);
/**
*
* @param pSkipList
* @return NULL will always be returned
*/
void
*
tSkipListDestroy
(
SSkipList
*
pSkipList
);
// put data into skiplist
tSkipListNode
*
tSkipListPut
(
tSkipList
*
pSkipList
,
void
*
pData
,
tSkipListKey
*
pKey
,
int32_t
insertIdenticalKey
);
/**
*
* @param pSkipList
* @param level
* @param headSize
*/
void
tSkipListRandNodeInfo
(
SSkipList
*
pSkipList
,
int32_t
*
level
,
int32_t
*
headSize
);
/*
* get only *one* node of which key is equalled to pKey, even there are more
* than one nodes are of the same key
/**
* put the skip list node into the skip list.
* If failed, NULL will be returned, otherwise, the pNode will be returned.
*
* @param pSkipList
* @param pNode
* @return
*/
tSkipListNode
*
tSkipListGetOne
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
);
SSkipListNode
*
tSkipListPut
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
/*
* get all data with the same keys
/**
* get only *one* node of which key is equalled to pKey, even there are more than one nodes are of the same key
*
* @param pSkipList
* @param pKey
* @param keyType
* @return
*/
int32_t
tSkipListGets
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
,
tSkipListNode
***
pRes
);
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
,
int16_t
keyType
);
int32_t
tSkipListIterateList
(
tSkipList
*
pSkipList
,
tSkipListNode
***
pRes
,
bool
(
*
fp
)(
tSkipListNode
*
,
void
*
),
/**
*
* @param pSkipList
* @param pRes
* @param fp
* @param param
* @return
*/
int32_t
tSkipListIterateList
(
SSkipList
*
pSkipList
,
SSkipListNode
***
pRes
,
bool
(
*
fp
)(
SSkipListNode
*
,
void
*
),
void
*
param
);
/*
...
...
@@ -173,30 +203,16 @@ int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (
* true: one node has been removed
* false: no node has been removed
*/
bool
tSkipListRemove
(
tSkipList
*
pSkipList
,
t
SkipListKey
*
pKey
);
bool
tSkipListRemove
(
SSkipList
*
pSkipList
,
S
SkipListKey
*
pKey
);
/*
* remove the specified node in parameters
*/
void
tSkipListRemoveNode
(
tSkipList
*
pSkipList
,
tSkipListNode
*
pNode
);
// for debug purpose only
void
tSkipListPrint
(
tSkipList
*
pSkipList
,
int16_t
nlevel
);
/*
* range query & single point query function
*/
int32_t
tSkipListQuery
(
tSkipList
*
pSkipList
,
tSKipListQueryCond
*
pQueryCond
,
tSkipListNode
***
pResult
);
/*
* include/exclude point query
*/
int32_t
tSkipListPointQuery
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
,
int32_t
numOfKey
,
tSkipListPointQueryType
type
,
tSkipListNode
***
pResult
);
void
tSkipListRemoveNode
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
int32_t
tSkipListIteratorReset
(
t
SkipList
*
pSkipList
,
SSkipListIterator
*
iter
);
bool
tSkipListIteratorNext
(
SSkipListIterator
*
iter
);
t
SkipListNode
*
tSkipListIteratorGet
(
SSkipListIterator
*
iter
);
int32_t
tSkipListIteratorReset
(
S
SkipList
*
pSkipList
,
SSkipListIterator
*
iter
);
bool
tSkipListIteratorNext
(
SSkipListIterator
*
iter
);
S
SkipListNode
*
tSkipListIteratorGet
(
SSkipListIterator
*
iter
);
#ifdef __cplusplus
}
...
...
src/util/src/tarray.c
0 → 100755
浏览文件 @
576f7703
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tarray.h"
void
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
if
(
size
<
TARRAY_MIN_SIZE
)
{
size
=
TARRAY_MIN_SIZE
;
}
SArray
*
pArray
=
calloc
(
1
,
sizeof
(
SArray
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
pArray
->
pData
=
calloc
(
size
,
elemSize
*
size
);
if
(
pArray
->
pData
==
NULL
)
{
free
(
pArray
);
return
NULL
;
}
pArray
->
capacity
=
size
;
pArray
->
elemSize
=
elemSize
;
return
pArray
;
}
static
void
taosArrayResize
(
SArray
*
pArray
)
{
assert
(
pArray
->
size
>=
pArray
->
capacity
);
size_t
size
=
pArray
->
capacity
;
size
=
(
size
<<
1u
);
void
*
tmp
=
realloc
(
pArray
->
pData
,
size
*
pArray
->
elemSize
);
if
(
tmp
==
NULL
)
{
// todo
}
pArray
->
pData
=
tmp
;
pArray
->
capacity
=
size
;
}
void
*
taosArrayPush
(
SArray
*
pArray
,
void
*
pData
)
{
if
(
pArray
==
NULL
||
pData
==
NULL
)
{
return
NULL
;
}
if
(
pArray
->
size
>=
pArray
->
capacity
)
{
taosArrayResize
(
pArray
);
}
void
*
dst
=
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
);
memcpy
(
dst
,
pData
,
pArray
->
elemSize
);
pArray
->
size
+=
1
;
return
dst
;
}
void
taosArrayPop
(
SArray
*
pArray
)
{
if
(
pArray
==
NULL
||
pArray
->
size
==
0
)
{
return
;
}
pArray
->
size
-=
1
;
}
void
*
taosArrayGet
(
SArray
*
pArray
,
size_t
index
)
{
assert
(
index
<
pArray
->
size
);
return
TARRAY_GET_ELEM
(
pArray
,
index
);
}
size_t
taosArrayGetSize
(
SArray
*
pArray
)
{
return
pArray
->
size
;
}
void
taosArrayInsert
(
SArray
*
pArray
,
int32_t
index
,
void
*
pData
)
{
if
(
pArray
==
NULL
||
pData
==
NULL
)
{
return
;
}
if
(
index
>=
pArray
->
size
)
{
taosArrayPush
(
pArray
,
pData
);
return
;
}
if
(
pArray
->
size
>=
pArray
->
capacity
)
{
taosArrayResize
(
pArray
);
}
void
*
dst
=
TARRAY_GET_ELEM
(
pArray
,
index
);
int32_t
remain
=
pArray
->
size
-
index
;
memmove
(
dst
+
pArray
->
elemSize
,
dst
,
pArray
->
elemSize
*
remain
);
memcpy
(
dst
,
pData
,
pArray
->
elemSize
);
pArray
->
size
+=
1
;
}
void
taosArrayDestory
(
SArray
*
pArray
)
{
if
(
pArray
==
NULL
)
{
return
;
}
free
(
pArray
->
pData
);
free
(
pArray
);
}
src/util/src/tskiplist.c
浏览文件 @
576f7703
...
...
@@ -15,49 +15,54 @@
#include "os.h"
#include "tlog.h"
#include "taosdef
.h"
// #include "tsdb
.h"
#include "tskiplist.h"
#include "tutil.h"
static
FORCE_INLINE
void
recordNodeEachLevel
(
tSkipList
*
pSkipList
,
int32_t
nLevel
)
{
// record link count in each level
for
(
int32_t
i
=
0
;
i
<
nLevel
;
++
i
)
{
static
FORCE_INLINE
void
recordNodeEachLevel
(
SSkipList
*
pSkipList
,
int32_t
level
)
{
// record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE
for
(
int32_t
i
=
0
;
i
<
level
;
++
i
)
{
pSkipList
->
state
.
nLevelNodeCnt
[
i
]
++
;
}
#endif
}
static
FORCE_INLINE
void
removeNodeEachLevel
(
tSkipList
*
pSkipList
,
int32_t
nLevel
)
{
for
(
int32_t
i
=
0
;
i
<
nLevel
;
++
i
)
{
static
FORCE_INLINE
void
removeNodeEachLevel
(
SSkipList
*
pSkipList
,
int32_t
level
)
{
#if SKIP_LIST_RECORD_PERFORMANCE
for
(
int32_t
i
=
0
;
i
<
level
;
++
i
)
{
pSkipList
->
state
.
nLevelNodeCnt
[
i
]
--
;
}
#endif
}
static
FORCE_INLINE
int32_t
getSkipListNodeRandomHeight
(
t
SkipList
*
pSkipList
)
{
static
FORCE_INLINE
int32_t
getSkipListNodeRandomHeight
(
S
SkipList
*
pSkipList
)
{
const
uint32_t
factor
=
4
;
int32_t
n
=
1
;
while
((
rand
()
%
factor
)
==
0
&&
n
<=
pSkipList
->
nM
axLevel
)
{
while
((
rand
()
%
factor
)
==
0
&&
n
<=
pSkipList
->
m
axLevel
)
{
n
++
;
}
return
n
;
}
static
FORCE_INLINE
int32_t
getSkipList
NodeLevel
(
t
SkipList
*
pSkipList
)
{
int32_t
nL
evel
=
getSkipListNodeRandomHeight
(
pSkipList
);
if
(
pSkipList
->
nS
ize
==
0
)
{
nL
evel
=
1
;
pSkipList
->
nL
evel
=
1
;
static
FORCE_INLINE
int32_t
getSkipList
RandLevel
(
S
SkipList
*
pSkipList
)
{
int32_t
l
evel
=
getSkipListNodeRandomHeight
(
pSkipList
);
if
(
pSkipList
->
s
ize
==
0
)
{
l
evel
=
1
;
pSkipList
->
l
evel
=
1
;
}
else
{
if
(
nLevel
>
pSkipList
->
nLevel
&&
pSkipList
->
nLevel
<
pSkipList
->
nM
axLevel
)
{
nLevel
=
(
++
pSkipList
->
nL
evel
);
if
(
level
>
pSkipList
->
level
&&
pSkipList
->
level
<
pSkipList
->
m
axLevel
)
{
level
=
(
++
pSkipList
->
l
evel
);
}
}
return
nL
evel
;
return
l
evel
;
}
void
tSkipListDoInsert
(
tSkipList
*
pSkipList
,
tSkipListNode
**
forward
,
int32_t
nLevel
,
t
SkipListNode
*
pNode
);
static
void
tSkipListDoInsert
(
SSkipList
*
pSkipList
,
SSkipListNode
**
forward
,
int32_t
level
,
S
SkipListNode
*
pNode
);
void
tSkipListDoRecordPut
(
tSkipList
*
pSkipList
)
{
void
tSkipListDoRecordPut
(
SSkipList
*
pSkipList
)
{
#if SKIP_LIST_RECORD_PERFORMANCE
const
int32_t
MAX_RECORD_NUM
=
1000
;
if
(
pSkipList
->
state
.
nInsertObjs
==
MAX_RECORD_NUM
)
{
...
...
@@ -67,61 +72,38 @@ void tSkipListDoRecordPut(tSkipList *pSkipList) {
}
else
{
pSkipList
->
state
.
nInsertObjs
++
;
}
#endif
}
int32_t
compareIntVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int64_t
lhs
=
((
tSkipListKey
*
)
pLeft
)
->
i64Key
;
int64_t
rhs
=
((
tSkipListKey
*
)
pRight
)
->
i64Key
;
DEFAULT_COMP
(
lhs
,
rhs
);
}
int32_t
compareIntDoubleVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int64_t
lhs
=
((
tSkipListKey
*
)
pLeft
)
->
i64Key
;
double
rhs
=
((
tSkipListKey
*
)
pRight
)
->
dKey
;
if
(
fabs
(
lhs
-
rhs
)
<
FLT_EPSILON
)
{
int32_t
compareInt32Val
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
ret
=
GET_INT32_VAL
(
pLeft
)
-
GET_INT32_VAL
(
pRight
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
lhs
>
rhs
)
?
1
:
-
1
;
return
ret
>
0
?
1
:
-
1
;
}
}
int32_t
compareDoubleIntVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
double
lhs
=
((
tSkipListKey
*
)
pLeft
)
->
dKey
;
int64_t
rhs
=
((
tSkipListKey
*
)
pRight
)
->
i64Key
;
if
(
fabs
(
lhs
-
rhs
)
<
FLT_EPSILON
)
{
int32_t
compareInt64Val
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
ret
=
GET_INT64_VAL
(
pLeft
)
-
GET_INT64_VAL
(
pRight
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
lhs
>
rhs
)
?
1
:
-
1
;
return
ret
>
0
?
1
:
-
1
;
}
}
int32_t
compare
Double
Val
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
double
ret
=
(((
tSkipListKey
*
)
pLeft
)
->
dKey
-
((
tSkipListKey
*
)
pRight
)
->
dKey
);
if
(
fabs
(
ret
)
<
FLT_EPSILON
)
{
int32_t
compare
Int16
Val
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
ret
=
GET_INT16_VAL
(
pLeft
)
-
GET_INT16_VAL
(
pRight
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
ret
>
0
?
1
:
-
1
;
}
}
int32_t
compareStrVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
tSkipListKey
*
pL
=
(
tSkipListKey
*
)
pLeft
;
tSkipListKey
*
pR
=
(
tSkipListKey
*
)
pRight
;
if
(
pL
->
nLen
==
0
&&
pR
->
nLen
==
0
)
{
return
0
;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if
(
pL
->
nLen
==
-
1
)
{
return
1
;
// no lower bound, lower bound is minimum, always return -1;
}
else
if
(
pR
->
nLen
==
-
1
)
{
return
-
1
;
// no upper bound, upper bound is maximum situation, always return 1;
}
int32_t
ret
=
strcmp
(((
tSkipListKey
*
)
pLeft
)
->
pz
,
((
tSkipListKey
*
)
pRight
)
->
pz
);
int32_t
compareInt8Val
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
ret
=
GET_INT8_VAL
(
pLeft
)
-
GET_INT8_VAL
(
pRight
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
...
...
@@ -129,716 +111,692 @@ int32_t compareStrVal(const void *pLeft, const void *pRight) {
}
}
int32_t
compareWStrVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
tSkipListKey
*
pL
=
(
tSkipListKey
*
)
pLeft
;
tSkipListKey
*
pR
=
(
tSkipListKey
*
)
pRight
;
if
(
pL
->
nLen
==
0
&&
pR
->
nLen
==
0
)
{
return
0
;
}
//handle only one-side bound compare situation, there is only lower bound or only upper bound
if
(
pL
->
nLen
==
-
1
)
{
return
1
;
// no lower bound, lower bound is minimum, always return -1;
}
else
if
(
pR
->
nLen
==
-
1
)
{
return
-
1
;
// no upper bound, upper bound is maximum situation, always return 1;
}
int32_t
compareIntDoubleVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
// int64_t lhs = ((SSkipListKey *)pLeft)->i64Key;
// double rhs = ((SSkipListKey *)pRight)->dKey;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return
0
;
}
int32_t
ret
=
wcscmp
(((
tSkipListKey
*
)
pLeft
)
->
wpz
,
((
tSkipListKey
*
)
pRight
)
->
wpz
);
int32_t
compareDoubleIntVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
// double lhs = ((SSkipListKey *)pLeft)->dKey;
// int64_t rhs = ((SSkipListKey *)pRight)->i64Key;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return
0
;
}
if
(
ret
==
0
)
{
int32_t
compareDoubleVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
double
ret
=
GET_DOUBLE_VAL
(
pLeft
)
-
GET_DOUBLE_VAL
(
pRight
);
if
(
fabs
(
ret
)
<
FLT_EPSILON
)
{
return
0
;
}
else
{
return
ret
>
0
?
1
:
-
1
;
}
}
static
__compar_fn_t
getKeyFilterComparator
(
tSkipList
*
pSkipList
,
int32_t
filterDataType
)
{
__compar_fn_t
comparator
=
NULL
;
int32_t
compareStrVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
// SSkipListKey *pL = (SSkipListKey *)pLeft;
// SSkipListKey *pR = (SSkipListKey *)pRight;
//
// if (pL->nLen == 0 && pR->nLen == 0) {
// return 0;
// }
//
// // handle only one-side bound compare situation, there is only lower bound or only upper bound
// if (pL->nLen == -1) {
// return 1; // no lower bound, lower bound is minimum, always return -1;
// } else if (pR->nLen == -1) {
// return -1; // no upper bound, upper bound is maximum situation, always return 1;
// }
//
// int32_t ret = strcmp(((SSkipListKey *)pLeft)->pz, ((SSkipListKey *)pRight)->pz);
//
// if (ret == 0) {
// return 0;
// } else {
// return ret > 0 ? 1 : -1;
// }
return
0
;
}
switch
(
pSkipList
->
keyType
)
{
int32_t
compareWStrVal
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
// SSkipListKey *pL = (SSkipListKey *)pLeft;
// SSkipListKey *pR = (SSkipListKey *)pRight;
//
// if (pL->nLen == 0 && pR->nLen == 0) {
// return 0;
// }
//
// // handle only one-side bound compare situation, there is only lower bound or only upper bound
// if (pL->nLen == -1) {
// return 1; // no lower bound, lower bound is minimum, always return -1;
// } else if (pR->nLen == -1) {
// return -1; // no upper bound, upper bound is maximum situation, always return 1;
// }
//
// int32_t ret = wcscmp(((SSkipListKey *)pLeft)->wpz, ((SSkipListKey *)pRight)->wpz);
//
// if (ret == 0) {
// return 0;
// } else {
// return ret > 0 ? 1 : -1;
// }
return
0
;
}
static
__compar_fn_t
getKeyFilterComparator
(
SSkipList
*
pSkipList
,
int32_t
filterDataType
)
{
__compar_fn_t
comparFn
=
NULL
;
switch
(
pSkipList
->
keyInfo
.
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_BIGINT
:
{
if
(
filterDataType
==
TSDB_DATA_TYPE_BIGINT
)
{
comparFn
=
compareInt64Val
;
break
;
}
}
case
TSDB_DATA_TYPE_BOOL
:
{
if
(
filterDataType
>=
TSDB_DATA_TYPE_BOOL
&&
filterDataType
<=
TSDB_DATA_TYPE_BIGINT
)
{
compar
ator
=
compareInt
Val
;
compar
Fn
=
compareInt32
Val
;
}
else
if
(
filterDataType
>=
TSDB_DATA_TYPE_FLOAT
&&
filterDataType
<=
TSDB_DATA_TYPE_DOUBLE
)
{
compar
ator
=
compareIntDoubleVal
;
compar
Fn
=
compareIntDoubleVal
;
}
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
{
if
(
filterDataType
>=
TSDB_DATA_TYPE_BOOL
&&
filterDataType
<=
TSDB_DATA_TYPE_BIGINT
)
{
comparator
=
compareDoubleIntVal
;
}
else
if
(
filterDataType
>=
TSDB_DATA_TYPE_FLOAT
&&
filterDataType
<=
TSDB_DATA_TYPE_DOUBLE
)
{
comparator
=
compareDoubleVal
;
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareDoubleIntVal;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
if
(
filterDataType
==
TSDB_DATA_TYPE_DOUBLE
)
{
comparFn
=
compareDoubleVal
;
}
break
;
}
case
TSDB_DATA_TYPE_BINARY
:
compar
ator
=
compareStrVal
;
compar
Fn
=
compareStrVal
;
break
;
case
TSDB_DATA_TYPE_NCHAR
:
compar
ator
=
compareWStrVal
;
compar
Fn
=
compareWStrVal
;
break
;
default:
compar
ator
=
compareInt
Val
;
compar
Fn
=
compareInt32
Val
;
break
;
}
return
compar
ator
;
return
compar
Fn
;
}
static
__compar_fn_t
getKeyComparator
(
int32_t
keyType
)
{
__compar_fn_t
compar
ator
=
NULL
;
__compar_fn_t
compar
Fn
=
NULL
;
switch
(
keyType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
comparFn
=
compareInt8Val
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
comparFn
=
compareInt16Val
;
break
;
case
TSDB_DATA_TYPE_INT
:
comparFn
=
compareInt32Val
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
comparFn
=
compareInt64Val
;
break
;
case
TSDB_DATA_TYPE_BOOL
:
compar
ator
=
compareInt
Val
;
compar
Fn
=
compareInt32
Val
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
compar
ator
=
compareDoubleVal
;
compar
Fn
=
compareDoubleVal
;
break
;
case
TSDB_DATA_TYPE_BINARY
:
compar
ator
=
compareStrVal
;
compar
Fn
=
compareStrVal
;
break
;
case
TSDB_DATA_TYPE_NCHAR
:
compar
ator
=
compareWStrVal
;
compar
Fn
=
compareWStrVal
;
break
;
default:
compar
ator
=
compareInt
Val
;
compar
Fn
=
compareInt32
Val
;
break
;
}
return
compar
ator
;
return
compar
Fn
;
}
tSkipList
*
tSkipListCreate
(
int16_t
nMaxLevel
,
int16_t
keyType
,
int16_t
nMaxKeyLen
)
{
tSkipList
*
pSkipList
=
(
tSkipList
*
)
calloc
(
1
,
sizeof
(
tSkipList
));
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint8_t
keyLen
,
uint8_t
dupKey
,
uint8_t
lock
,
__sl_key_fn_t
fn
)
{
SSkipList
*
pSkipList
=
(
SSkipList
*
)
calloc
(
1
,
sizeof
(
SSkipList
));
if
(
pSkipList
==
NULL
)
{
return
NULL
;
}
pSkipList
->
keyType
=
keyType
;
pSkipList
->
comparator
=
getKeyComparator
(
keyType
);
pSkipList
->
pHead
.
pForward
=
(
tSkipListNode
**
)
calloc
(
1
,
POINTER_BYTES
*
MAX_SKIP_LIST_LEVEL
);
pSkipList
->
nMaxLevel
=
MAX_SKIP_LIST_LEVEL
;
pSkipList
->
nLevel
=
1
;
if
(
maxLevel
>
MAX_SKIP_LIST_LEVEL
)
{
maxLevel
=
MAX_SKIP_LIST_LEVEL
;
}
pSkipList
->
nMaxKeyLen
=
nMaxKeyLen
;
pSkipList
->
nMaxLevel
=
nMaxLevel
;
pSkipList
->
keyInfo
=
(
SSkipListKeyInfo
){.
type
=
keyType
,
.
len
=
keyLen
,
.
dupKey
=
dupKey
}
;
pSkipList
->
keyFn
=
fn
;
if
(
pthread_rwlock_init
(
&
pSkipList
->
lock
,
NULL
)
!=
0
)
{
tfree
(
pSkipList
->
pHead
.
pForward
);
tfree
(
pSkipList
);
return
NULL
;
}
pSkipList
->
comparFn
=
getKeyComparator
(
keyType
);
pSkipList
->
maxLevel
=
maxLevel
;
pSkipList
->
level
=
1
;
srand
(
time
(
NULL
));
pSkipList
->
state
.
nTotalMemSize
+=
sizeof
(
tSkipList
);
return
pSkipList
;
}
pSkipList
->
pHead
=
(
SSkipListNode
*
)
calloc
(
1
,
SL_NODE_HEADER_SIZE
(
maxLevel
));
pSkipList
->
pHead
->
level
=
pSkipList
->
maxLevel
;
static
void
doRemove
(
tSkipList
*
pSkipList
,
tSkipListNode
*
pNode
,
tSkipListNode
*
forward
[])
{
int32_t
level
=
pNode
->
nLevel
;
for
(
int32_t
j
=
level
-
1
;
j
>=
0
;
--
j
)
{
if
((
forward
[
j
]
->
pForward
[
j
]
!=
NULL
)
&&
(
forward
[
j
]
->
pForward
[
j
]
->
pForward
[
j
]))
{
forward
[
j
]
->
pForward
[
j
]
->
pForward
[
j
]
->
pBackward
[
j
]
=
forward
[
j
];
}
if
(
lock
)
{
pSkipList
->
lock
=
calloc
(
1
,
sizeof
(
pthread_rwlock_t
));
if
(
forward
[
j
]
->
pForward
[
j
]
!=
NULL
)
{
forward
[
j
]
->
pForward
[
j
]
=
forward
[
j
]
->
pForward
[
j
]
->
pForward
[
j
];
if
(
pthread_rwlock_init
(
pSkipList
->
lock
,
NULL
)
!=
0
)
{
tfree
(
pSkipList
->
pHead
);
tfree
(
pSkipList
);
return
NULL
;
}
}
pSkipList
->
state
.
nTotalMemSize
-=
(
sizeof
(
tSkipListNode
)
+
POINTER_BYTES
*
pNode
->
nLevel
*
2
);
removeNodeEachLevel
(
pSkipList
,
pNode
->
nLevel
);
srand
(
time
(
NULL
));
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList
->
state
.
nTotalMemSize
+=
sizeof
(
SSkipList
);
#endif
tfree
(
pNode
);
--
pSkipList
->
nSize
;
return
pSkipList
;
}
static
size_t
getOneNodeSize
(
const
tSkipListKey
*
pKey
,
int32_t
nLevel
)
{
size_t
size
=
sizeof
(
tSkipListNode
)
+
sizeof
(
intptr_t
)
*
(
nLevel
<<
1
);
if
(
pKey
->
nType
==
TSDB_DATA_TYPE_BINARY
)
{
size
+=
pKey
->
nLen
+
1
;
}
else
if
(
pKey
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
size
+=
(
pKey
->
nLen
+
1
)
*
TSDB_NCHAR_SIZE
;
// static void doRemove(SSkipList *pSkipList, SSkipListNode *pNode, SSkipListNode *forward[]) {
// int32_t level = pNode->level;
// for (int32_t j = level - 1; j >= 0; --j) {
// if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
// forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
// }
//
// if (forward[j]->pForward[j] != NULL) {
// forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
// }
// }
//
// pSkipList->state.nTotalMemSize -= (sizeof(SSkipListNode) + POINTER_BYTES * pNode->level * 2);
// removeNodeEachLevel(pSkipList, pNode->level);
//
// tfree(pNode);
// --pSkipList->size;
//}
void
*
tSkipListDestroy
(
SSkipList
*
pSkipList
)
{
if
(
pSkipList
==
NULL
)
{
return
NULL
;
}
return
size
;
}
if
(
pSkipList
->
lock
)
{
pthread_rwlock_wrlock
(
pSkipList
->
lock
);
}
static
tSkipListNode
*
tSkipListCreateNode
(
void
*
pData
,
const
tSkipListKey
*
pKey
,
int32_t
nLevel
)
{
size_t
nodeSize
=
getOneNodeSize
(
pKey
,
nLevel
);
tSkipListNode
*
pNode
=
(
tSkipListNode
*
)
calloc
(
1
,
nodeSize
);
SSkipListNode
*
pNode
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
0
);
// pSkipList->pHead.pForward[0];
pNode
->
pForward
=
(
tSkipListNode
**
)(
&
pNode
[
1
]);
pNode
->
pBackward
=
(
pNode
->
pForward
+
nLevel
);
while
(
pNode
)
{
SSkipListNode
*
pTemp
=
pNode
;
pNode
=
SL_GET_FORWARD_POINTER
(
pNode
,
0
);
tfree
(
pTemp
);
}
pNode
->
pData
=
pData
;
tfree
(
pSkipList
->
pHead
)
;
pNode
->
key
=
*
pKey
;
if
(
pKey
->
nType
==
TSDB_DATA_TYPE_BINARY
)
{
p
Node
->
key
.
pz
=
(
char
*
)(
pNode
->
pBackward
+
nLevel
);
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
p
thread_rwlock_destroy
(
pSkipList
->
lock
);
strcpy
(
pNode
->
key
.
pz
,
pKey
->
pz
);
pNode
->
key
.
pz
[
pKey
->
nLen
]
=
0
;
}
else
if
(
pKey
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
pNode
->
key
.
wpz
=
(
wchar_t
*
)(
pNode
->
pBackward
+
nLevel
);
wcsncpy
(
pNode
->
key
.
wpz
,
pKey
->
wpz
,
pKey
->
nLen
);
pNode
->
key
.
wpz
[
pKey
->
nLen
]
=
0
;
tfree
(
pSkipList
->
lock
);
}
pNode
->
nLevel
=
nLevel
;
return
pNode
;
}
tSkipListKey
tSkipListCreateKey
(
int32_t
type
,
char
*
val
,
size_t
keyLength
)
{
tSkipListKey
k
=
{
0
};
tVariantCreateFromBinary
(
&
k
,
val
,
(
uint32_t
)
keyLength
,
(
uint32_t
)
type
);
return
k
;
tfree
(
pSkipList
->
pHead
);
tfree
(
pSkipList
);
return
NULL
;
}
void
tSkipListDestroyKey
(
tSkipListKey
*
pKey
)
{
tVariantDestroy
(
pKey
);
}
void
*
tSkipListDestroy
(
tSkipList
*
pSkipList
)
{
void
tSkipListRandNodeInfo
(
SSkipList
*
pSkipList
,
int32_t
*
level
,
int32_t
*
headSize
)
{
if
(
pSkipList
==
NULL
)
{
return
NULL
;
}
pthread_rwlock_wrlock
(
&
pSkipList
->
lock
);
tSkipListNode
*
pNode
=
pSkipList
->
pHead
.
pForward
[
0
];
while
(
pNode
)
{
tSkipListNode
*
pTemp
=
pNode
;
pNode
=
pNode
->
pForward
[
0
];
tfree
(
pTemp
);
return
;
}
tfree
(
pSkipList
->
pHead
.
pForward
);
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
pthread_rwlock_destroy
(
&
pSkipList
->
lock
);
tfree
(
pSkipList
);
return
NULL
;
*
level
=
getSkipListRandLevel
(
pSkipList
);
*
headSize
=
SL_NODE_HEADER_SIZE
(
*
level
);
}
tSkipListNode
*
tSkipListPut
(
tSkipList
*
pSkipList
,
void
*
pData
,
tSkipListKey
*
pKey
,
int32_t
insertIdenticalKey
)
{
SSkipListNode
*
tSkipListPut
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
)
{
if
(
pSkipList
==
NULL
)
{
return
NULL
;
}
pthread_rwlock_wrlock
(
&
pSkipList
->
lock
);
if
(
pSkipList
->
lock
)
{
pthread_rwlock_wrlock
(
pSkipList
->
lock
);
}
// record one node is put into skiplist
tSkipListDoRecordPut
(
pSkipList
);
tSkipListNode
*
px
=
&
pSkipList
->
pHead
;
SSkipListNode
*
px
=
pSkipList
->
pHead
;
SSkipListNode
*
forward
[
MAX_SKIP_LIST_LEVEL
]
=
{
0
};
tSkipListNode
*
forward
[
MAX_SKIP_LIST_LEVEL
]
=
{
0
};
for
(
int32_t
i
=
pSkipList
->
nLevel
-
1
;
i
>=
0
;
--
i
)
{
while
(
px
->
pForward
[
i
]
!=
NULL
&&
(
pSkipList
->
comparator
(
&
px
->
pForward
[
i
]
->
key
,
pKey
)
<
0
))
{
px
=
px
->
pForward
[
i
];
for
(
int32_t
i
=
pSkipList
->
level
-
1
;
i
>=
0
;
--
i
)
{
SSkipListNode
*
p
=
SL_GET_FORWARD_POINTER
(
px
,
i
);
while
(
p
!=
NULL
)
{
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
p
);
char
*
newDatakey
=
SL_GET_NODE_KEY
(
pSkipList
,
pNode
);
// if the forward element is less than the specified key, forward one step
if
(
pSkipList
->
comparFn
(
key
,
newDatakey
)
<
0
)
{
px
=
p
;
p
=
SL_GET_FORWARD_POINTER
(
px
,
i
);
}
else
{
break
;
}
}
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList
->
state
.
nTotalStepsForInsert
++
;
#endif
forward
[
i
]
=
px
;
}
// if the skiplist does not allowed identical key inserted, the new data will be discarded.
if
((
insertIdenticalKey
==
0
)
&&
forward
[
0
]
!=
&
pSkipList
->
pHead
&&
(
pSkipList
->
comparator
(
&
forward
[
0
]
->
key
,
pKey
)
==
0
))
{
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
forward
[
0
];
// if the skip list does not allowed identical key inserted, the new data will be discarded.
if
(
pSkipList
->
keyInfo
.
dupKey
==
0
&&
forward
[
0
]
!=
pSkipList
->
pHead
)
{
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
forward
[
0
]);
char
*
pNewDataKey
=
SL_GET_NODE_KEY
(
pSkipList
,
pNode
);
if
(
pSkipList
->
comparFn
(
key
,
pNewDataKey
)
==
0
)
{
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
return
forward
[
0
];
}
}
int32_t
nLevel
=
getSkipListNodeLevel
(
pSkipList
);
recordNodeEachLevel
(
pSkipList
,
nLevel
);
#if SKIP_LIST_RECORD_PERFORMANCE
recordNodeEachLevel
(
pSkipList
,
level
);
#endif
tSkipListNode
*
pNode
=
tSkipListCreateNode
(
pData
,
pKey
,
nLevel
);
tSkipListDoInsert
(
pSkipList
,
forward
,
nL
evel
,
pNode
);
int32_t
level
=
SL_GET_NODE_LEVEL
(
pNode
);
tSkipListDoInsert
(
pSkipList
,
forward
,
l
evel
,
pNode
);
pSkipList
->
nSize
+=
1
;
atomic_add_fetch_32
(
&
pSkipList
->
size
,
1
)
;
// char tmpstr[512] = {0};
// tVariantToString(&pNode->key, tmpstr);
// pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList,
// tmpstr, pSkipList->nSize);
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList
->
state
.
nTotalMemSize
+=
getOneNodeSize
(
pKey
,
level
);
#endif
pSkipList
->
state
.
nTotalMemSize
+=
getOneNodeSize
(
pKey
,
nLevel
);
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
return
pNode
;
}
void
tSkipListDoInsert
(
tSkipList
*
pSkipList
,
tSkipListNode
**
forward
,
int32_t
nLevel
,
t
SkipListNode
*
pNode
)
{
for
(
int32_t
i
=
0
;
i
<
nL
evel
;
++
i
)
{
t
SkipListNode
*
x
=
forward
[
i
];
void
tSkipListDoInsert
(
SSkipList
*
pSkipList
,
SSkipListNode
**
forward
,
int32_t
level
,
S
SkipListNode
*
pNode
)
{
for
(
int32_t
i
=
0
;
i
<
l
evel
;
++
i
)
{
S
SkipListNode
*
x
=
forward
[
i
];
if
(
x
!=
NULL
)
{
pNode
->
pBackward
[
i
]
=
x
;
if
(
x
->
pForward
[
i
])
x
->
pForward
[
i
]
->
pBackward
[
i
]
=
pNode
;
SL_GET_BACKWARD_POINTER
(
pNode
,
i
)
=
x
;
pNode
->
pForward
[
i
]
=
x
->
pForward
[
i
];
x
->
pForward
[
i
]
=
pNode
;
SSkipListNode
*
pForward
=
SL_GET_FORWARD_POINTER
(
x
,
i
);
if
(
pForward
)
{
SL_GET_BACKWARD_POINTER
(
pForward
,
i
)
=
pNode
;
}
SL_GET_FORWARD_POINTER
(
pNode
,
i
)
=
SL_GET_FORWARD_POINTER
(
x
,
i
);
SL_GET_FORWARD_POINTER
(
x
,
i
)
=
pNode
;
}
else
{
pSkipList
->
pHead
.
pForward
[
i
]
=
pNode
;
pNode
->
pBackward
[
i
]
=
&
(
pSkipList
->
pHead
);
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
i
)
=
pNode
;
SL_GET_BACKWARD_POINTER
(
pSkipList
->
pHead
,
i
)
=
(
pSkipList
->
pHead
);
}
}
}
tSkipListNode
*
tSkipListGetOne
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
)
{
int32_t
sLevel
=
pSkipList
->
nL
evel
-
1
;
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
,
int16_t
keyType
)
{
int32_t
sLevel
=
pSkipList
->
l
evel
-
1
;
int32_t
ret
=
-
1
;
tSkipListNode
*
x
=
&
pSkipList
->
pHead
;
// result list
SArray
*
sa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SSkipListNode
*
pNode
=
pSkipList
->
pHead
;
if
(
pSkipList
->
lock
)
{
pthread_rwlock_rdlock
(
pSkipList
->
lock
);
}
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
#if SKIP_LIST_RECORD_PERFORMANCE
pSkipList
->
state
.
queryCount
++
;
#endif
__compar_fn_t
filterCompar
ator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
n
Type
);
__compar_fn_t
filterCompar
Fn
=
getKeyFilterComparator
(
pSkipList
,
key
Type
);
for
(
int32_t
i
=
sLevel
;
i
>=
0
;
--
i
)
{
while
(
x
->
pForward
[
i
]
!=
NULL
&&
(
ret
=
filterComparator
(
&
x
->
pForward
[
i
]
->
key
,
pKey
))
<
0
)
{
x
=
x
->
pForward
[
i
];
}
if
(
ret
==
0
)
{
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
x
->
pForward
[
i
];
}
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
NULL
;
}
static
int32_t
tSkipListEndParQuery
(
tSkipList
*
pSkipList
,
tSkipListNode
*
pStartNode
,
tSkipListKey
*
pEndKey
,
int32_t
cond
,
tSkipListNode
***
pRes
)
{
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
tSkipListNode
*
p
=
pStartNode
;
int32_t
numOfRes
=
0
;
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pEndKey
->
nType
);
while
(
p
!=
NULL
)
{
int32_t
ret
=
filterComparator
(
&
p
->
key
,
pEndKey
);
if
(
ret
>
0
)
{
break
;
}
if
(
ret
<
0
)
{
numOfRes
++
;
p
=
p
->
pForward
[
0
];
}
else
if
(
ret
==
0
)
{
if
(
cond
==
TSDB_RELATION_LESS_EQUAL
)
{
numOfRes
++
;
p
=
p
->
pForward
[
0
];
SSkipListNode
*
pNext
=
SL_GET_FORWARD_POINTER
(
pNode
,
i
);
while
(
pNext
!=
NULL
)
{
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
pNext
);
if
((
ret
=
filterComparFn
(
key
,
pKey
))
<
0
)
{
pNode
=
pNext
;
pNext
=
SL_GET_FORWARD_POINTER
(
pNext
,
i
);
}
else
{
break
;
}
}
}
(
*
pRes
)
=
(
tSkipListNode
**
)
malloc
(
POINTER_BYTES
*
numOfRes
);
for
(
int32_t
i
=
0
;
i
<
numOfRes
;
++
i
)
{
(
*
pRes
)[
i
]
=
pStartNode
;
pStartNode
=
pStartNode
->
pForward
[
0
];
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
numOfRes
;
}
/*
* maybe return the copy of tSkipListNode would be better
*/
int32_t
tSkipListGets
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
,
tSkipListNode
***
pRes
)
{
(
*
pRes
)
=
NULL
;
tSkipListNode
*
pNode
=
tSkipListGetOne
(
pSkipList
,
pKey
);
if
(
pNode
==
NULL
)
{
return
0
;
}
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
nType
);
// backward check if previous nodes are with the same value.
tSkipListNode
*
pPrev
=
pNode
->
pBackward
[
0
];
while
((
pPrev
!=
&
pSkipList
->
pHead
)
&&
filterComparator
(
&
pPrev
->
key
,
pKey
)
==
0
)
{
pPrev
=
pPrev
->
pBackward
[
0
];
}
return
tSkipListEndParQuery
(
pSkipList
,
pPrev
->
pForward
[
0
],
&
pNode
->
key
,
TSDB_RELATION_LESS_EQUAL
,
pRes
);
}
static
tSkipListNode
*
tSkipListParQuery
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
,
int32_t
cond
)
{
int32_t
sLevel
=
pSkipList
->
nLevel
-
1
;
int32_t
ret
=
-
1
;
tSkipListNode
*
x
=
&
pSkipList
->
pHead
;
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
nType
);
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
if
(
cond
==
TSDB_RELATION_LARGE_EQUAL
||
cond
==
TSDB_RELATION_LARGE
)
{
for
(
int32_t
i
=
sLevel
;
i
>=
0
;
--
i
)
{
while
(
x
->
pForward
[
i
]
!=
NULL
&&
(
ret
=
filterComparator
(
&
x
->
pForward
[
i
]
->
key
,
pKey
))
<
0
)
{
x
=
x
->
pForward
[
i
];
}
}
// backward check if previous nodes are with the same value.
if
(
cond
==
TSDB_RELATION_LARGE_EQUAL
&&
ret
==
0
)
{
tSkipListNode
*
pNode
=
x
->
pForward
[
0
];
while
((
pNode
->
pBackward
[
0
]
!=
&
pSkipList
->
pHead
)
&&
(
filterComparator
(
&
pNode
->
pBackward
[
0
]
->
key
,
pKey
)
==
0
))
{
pNode
=
pNode
->
pBackward
[
0
];
// find the qualified key
if
(
ret
==
0
)
{
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
pNode
;
}
if
(
ret
>
0
||
cond
==
TSDB_RELATION_LARGE_EQUAL
)
{
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
x
->
pForward
[
0
];
}
else
{
// cond == TSDB_RELATION_LARGE && ret == 0
tSkipListNode
*
pn
=
x
->
pForward
[
0
];
while
(
pn
!=
NULL
&&
filterComparator
(
&
pn
->
key
,
pKey
)
==
0
)
{
pn
=
pn
->
pForward
[
0
];
SSkipListNode
*
pResult
=
SL_GET_FORWARD_POINTER
(
pNode
,
i
);
taosArrayPush
(
sa
,
&
pResult
);
// skip list does not allowed duplicated key, abort further retrieve data
if
(
!
pSkipList
->
keyInfo
.
dupKey
)
{
break
;
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
pn
;
}
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
NULL
;
}
int32_t
tSkipListIterateList
(
tSkipList
*
pSkipList
,
tSkipListNode
***
pRes
,
bool
(
*
fp
)(
tSkipListNode
*
,
void
*
),
void
*
param
)
{
(
*
pRes
)
=
(
tSkipListNode
**
)
calloc
(
1
,
POINTER_BYTES
*
pSkipList
->
nSize
);
if
(
NULL
==
*
pRes
)
{
pError
(
"error skiplist %p, malloc failed"
,
pSkipList
);
return
-
1
;
}
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
tSkipListNode
*
pStartNode
=
pSkipList
->
pHead
.
pForward
[
0
];
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSkipList
->
nSize
;
++
i
)
{
if
(
pStartNode
==
NULL
)
{
pError
(
"error skiplist %p, required length:%d, actual length:%d"
,
pSkipList
,
pSkipList
->
nSize
,
i
-
1
);
#ifdef _DEBUG_VIEW
tSkipListPrint
(
pSkipList
,
1
);
#endif
break
;
}
if
(
fp
==
NULL
||
(
fp
!=
NULL
&&
fp
(
pStartNode
,
param
)
==
true
))
{
(
*
pRes
)[
num
++
]
=
pStartNode
;
}
pStartNode
=
pStartNode
->
pForward
[
0
];
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
if
(
num
==
0
)
{
free
(
*
pRes
);
*
pRes
=
NULL
;
}
else
if
(
num
<
pSkipList
->
nSize
)
{
// free unused memory
char
*
tmp
=
realloc
((
*
pRes
),
num
*
POINTER_BYTES
);
assert
(
tmp
!=
NULL
);
*
pRes
=
(
tSkipListNode
**
)
tmp
;
}
return
num
;
}
int32_t
tSkipListIteratorReset
(
tSkipList
*
pSkipList
,
SSkipListIterator
*
iter
)
{
if
(
pSkipList
==
NULL
)
{
return
-
1
;
}
iter
->
pSkipList
=
pSkipList
;
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
iter
->
cur
=
NULL
;
//pSkipList->pHead.pForward[0];
iter
->
num
=
pSkipList
->
nSize
;
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
0
;
}
bool
tSkipListIteratorNext
(
SSkipListIterator
*
iter
)
{
if
(
iter
->
num
==
0
||
iter
->
pSkipList
==
NULL
)
{
return
false
;
}
tSkipList
*
pSkipList
=
iter
->
pSkipList
;
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
if
(
iter
->
cur
==
NULL
)
{
iter
->
cur
=
pSkipList
->
pHead
.
pForward
[
0
];
}
else
{
iter
->
cur
=
iter
->
cur
->
pForward
[
0
];
}
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
iter
->
cur
!=
NULL
;
}
tSkipListNode
*
tSkipListIteratorGet
(
SSkipListIterator
*
iter
)
{
return
iter
->
cur
;
}
int32_t
tSkipListRangeQuery
(
tSkipList
*
pSkipList
,
tSKipListQueryCond
*
pCond
,
tSkipListNode
***
pRes
)
{
pSkipList
->
state
.
queryCount
++
;
tSkipListNode
*
pStart
=
tSkipListParQuery
(
pSkipList
,
&
pCond
->
lowerBnd
,
pCond
->
lowerBndRelOptr
);
if
(
pStart
==
0
)
{
*
pRes
=
NULL
;
return
0
;
}
return
tSkipListEndParQuery
(
pSkipList
,
pStart
,
&
pCond
->
upperBnd
,
pCond
->
upperBndRelOptr
,
pRes
);
}
static
bool
removeSupport
(
tSkipList
*
pSkipList
,
tSkipListNode
**
forward
,
tSkipListKey
*
pKey
)
{
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
nType
);
if
(
filterComparator
(
&
forward
[
0
]
->
pForward
[
0
]
->
key
,
pKey
)
==
0
)
{
tSkipListNode
*
p
=
forward
[
0
]
->
pForward
[
0
];
doRemove
(
pSkipList
,
p
,
forward
);
}
else
{
// failed to find the node of specified value,abort
return
false
;
}
// compress the minimum level of skip list
while
(
pSkipList
->
nLevel
>
0
&&
pSkipList
->
pHead
.
pForward
[
pSkipList
->
nLevel
-
1
]
==
NULL
)
{
pSkipList
->
nLevel
-=
1
;
}
return
true
;
}
void
tSkipListRemoveNode
(
tSkipList
*
pSkipList
,
tSkipListNode
*
pNode
)
{
tSkipListNode
*
forward
[
MAX_SKIP_LIST_LEVEL
]
=
{
0
};
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
for
(
int32_t
i
=
0
;
i
<
pNode
->
nLevel
;
++
i
)
{
forward
[
i
]
=
pNode
->
pBackward
[
i
];
}
removeSupport
(
pSkipList
,
forward
,
&
pNode
->
key
);
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
}
bool
tSkipListRemove
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
)
{
tSkipListNode
*
forward
[
MAX_SKIP_LIST_LEVEL
]
=
{
0
};
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
nType
);
pthread_rwlock_rdlock
(
&
pSkipList
->
lock
);
tSkipListNode
*
x
=
&
pSkipList
->
pHead
;
for
(
int32_t
i
=
pSkipList
->
nLevel
-
1
;
i
>=
0
;
--
i
)
{
while
(
x
->
pForward
[
i
]
!=
NULL
&&
(
filterComparator
(
&
x
->
pForward
[
i
]
->
key
,
pKey
)
<
0
))
{
x
=
x
->
pForward
[
i
];
}
forward
[
i
]
=
x
;
}
bool
ret
=
removeSupport
(
pSkipList
,
forward
,
pKey
);
pthread_rwlock_unlock
(
&
pSkipList
->
lock
);
return
ret
;
}
void
tSkipListPrint
(
tSkipList
*
pSkipList
,
int16_t
nlevel
)
{
if
(
pSkipList
==
NULL
||
pSkipList
->
nLevel
<
nlevel
||
nlevel
<=
0
)
{
if
(
pSkipList
->
lock
)
{
pthread_rwlock_unlock
(
pSkipList
->
lock
);
}
return
sa
;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *p = pStartNode;
// int32_t numOfRes = 0;
//
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pEndKey->nType);
// while (p != NULL) {
// int32_t ret = filterComparFn(&p->key, pEndKey);
// if (ret > 0) {
// break;
// }
//
// if (ret < 0) {
// numOfRes++;
// p = p->pForward[0];
// } else if (ret == 0) {
// if (cond == TSDB_RELATION_LESS_EQUAL) {
// numOfRes++;
// p = p->pForward[0];
// } else {
// break;
// }
// }
// }
//
// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes);
// for (int32_t i = 0; i < numOfRes; ++i) {
// (*pRes)[i] = pStartNode;
// pStartNode = pStartNode->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return numOfRes;
//}
//
///*
// * maybe return the copy of SSkipListNode would be better
// */
// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) {
// (*pRes) = NULL;
//
// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey);
// if (pNode == NULL) {
// return 0;
// }
//
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// // backward check if previous nodes are with the same value.
// SSkipListNode *pPrev = pNode->pBackward[0];
// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) {
// pPrev = pPrev->pBackward[0];
// }
//
// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
//}
//
// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) {
// int32_t sLevel = pSkipList->level - 1;
// int32_t ret = -1;
//
// SSkipListNode *x = &pSkipList->pHead;
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
// for (int32_t i = sLevel; i >= 0; --i) {
// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) {
// x = x->pForward[i];
// }
// }
//
// // backward check if previous nodes are with the same value.
// if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
// SSkipListNode *pNode = x->pForward[0];
// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) {
// pNode = pNode->pBackward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pNode;
// }
//
// if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
// pthread_rwlock_unlock(&pSkipList->lock);
// return x->pForward[0];
// } else { // cond == TSDB_RELATION_LARGE && ret == 0
// SSkipListNode *pn = x->pForward[0];
// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) {
// pn = pn->pForward[0];
// }
// pthread_rwlock_unlock(&pSkipList->lock);
// return pn;
// }
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
// return NULL;
//}
//
// int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
// void *param) {
// (*pRes) = (SSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
// if (NULL == *pRes) {
// pError("error skiplist %p, malloc failed", pSkipList);
// return -1;
// }
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// SSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
// int32_t num = 0;
//
// for (int32_t i = 0; i < pSkipList->nSize; ++i) {
// if (pStartNode == NULL) {
// pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
//#ifdef _DEBUG_VIEW
// tSkipListPrint(pSkipList, 1);
//#endif
// break;
// }
//
// if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
// (*pRes)[num++] = pStartNode;
// }
//
// pStartNode = pStartNode->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// if (num == 0) {
// free(*pRes);
// *pRes = NULL;
// } else if (num < pSkipList->nSize) { // free unused memory
// char *tmp = realloc((*pRes), num * POINTER_BYTES);
// assert(tmp != NULL);
//
// *pRes = (SSkipListNode **)tmp;
// }
//
// return num;
//}
//
// int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter) {
// if (pSkipList == NULL) {
// return -1;
// }
//
// iter->pSkipList = pSkipList;
// if (pSkipList->lock) {
// pthread_rwlock_rdlock(&pSkipList->lock);
// }
// iter->cur = NULL; // pSkipList->pHead.pForward[0];
// iter->num = pSkipList->size;
//
// if (pSkipList->lock) {
// pthread_rwlock_unlock(&pSkipList->lock);
// }
//
// return 0;
//}
//
// bool tSkipListIteratorNext(SSkipListIterator *iter) {
// if (iter->num == 0 || iter->pSkipList == NULL) {
// return false;
// }
//
// SSkipList *pSkipList = iter->pSkipList;
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// if (iter->cur == NULL) {
// iter->cur = pSkipList->pHead.pForward[0];
// } else {
// iter->cur = iter->cur->pForward[0];
// }
//
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return iter->cur != NULL;
//}
//
// SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter) { return iter->cur; }
//
// int32_t tSkipListRangeQuery(SSkipList *pSkipList, tSKipListQueryCond *pCond, SSkipListNode ***pRes) {
// pSkipList->state.queryCount++;
// SSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
// if (pStart == 0) {
// *pRes = NULL;
// return 0;
// }
//
// return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
//}
//
// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) {
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) {
// SSkipListNode *p = forward[0]->pForward[0];
// doRemove(pSkipList, p, forward);
// } else { // failed to find the node of specified value,abort
// return false;
// }
//
// // compress the minimum level of skip list
// while (pSkipList->level > 0 && pSkipList->pHead.pForward[pSkipList->level - 1] == NULL) {
// pSkipList->level -= 1;
// }
//
// return true;
//}
//
// void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
//
// pthread_rwlock_rdlock(&pSkipList->lock);
// for (int32_t i = 0; i < pNode->level; ++i) {
// forward[i] = pNode->pBackward[i];
// }
//
// removeSupport(pSkipList, forward, &pNode->key);
// pthread_rwlock_unlock(&pSkipList->lock);
//}
//
// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) {
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
// __compar_fn_t filterComparFn = getKeyFilterComparator(pSkipList, pKey->nType);
//
// pthread_rwlock_rdlock(&pSkipList->lock);
//
// SSkipListNode *x = &pSkipList->pHead;
// for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
// while (x->pForward[i] != NULL && (filterComparFn(&x->pForward[i]->key, pKey) < 0)) {
// x = x->pForward[i];
// }
// forward[i] = x;
// }
//
// bool ret = removeSupport(pSkipList, forward, pKey);
// pthread_rwlock_unlock(&pSkipList->lock);
//
// return ret;
//}
void
tSkipListPrint
(
SSkipList
*
pSkipList
,
int16_t
nlevel
)
{
if
(
pSkipList
==
NULL
||
pSkipList
->
level
<
nlevel
||
nlevel
<=
0
)
{
return
;
}
tSkipListNode
*
p
=
pSkipList
->
pHead
.
pForward
[
nlevel
-
1
]
;
SSkipListNode
*
p
=
SL_GET_FORWARD_POINTER
(
pSkipList
->
pHead
,
nlevel
-
1
)
;
int32_t
id
=
1
;
while
(
p
)
{
switch
(
pSkipList
->
keyType
)
{
char
*
key
=
SL_GET_NODE_KEY
(
pSkipList
,
p
);
switch
(
pSkipList
->
keyInfo
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_BIGINT
:
fprintf
(
stdout
,
"%d: %"
PRId64
"
\n
"
,
id
++
,
p
->
key
.
i64K
ey
);
fprintf
(
stdout
,
"%d: %"
PRId64
"
\n
"
,
id
++
,
*
(
int64_t
*
)
k
ey
);
break
;
case
TSDB_DATA_TYPE_BINARY
:
fprintf
(
stdout
,
"%d: %s
\n
"
,
id
++
,
p
->
key
.
pz
);
fprintf
(
stdout
,
"%d: %s
\n
"
,
id
++
,
key
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
fprintf
(
stdout
,
"%d: %lf
\n
"
,
id
++
,
p
->
key
.
dK
ey
);
fprintf
(
stdout
,
"%d: %lf
\n
"
,
id
++
,
*
(
double
*
)
k
ey
);
break
;
default:
fprintf
(
stdout
,
"
\n
"
);
}
p
=
p
->
pForward
[
nlevel
-
1
];
}
}
/*
* query processor based on query condition
*/
int32_t
tSkipListQuery
(
tSkipList
*
pSkipList
,
tSKipListQueryCond
*
pQueryCond
,
tSkipListNode
***
pResult
)
{
// query condition check
int32_t
rel
=
0
;
__compar_fn_t
comparator
=
getKeyComparator
(
pQueryCond
->
lowerBnd
.
nType
);
if
(
pSkipList
==
NULL
||
pQueryCond
==
NULL
||
pSkipList
->
nSize
==
0
||
(((
rel
=
comparator
(
&
pQueryCond
->
lowerBnd
,
&
pQueryCond
->
upperBnd
))
>
0
&&
pQueryCond
->
lowerBnd
.
nType
!=
TSDB_DATA_TYPE_NCHAR
&&
pQueryCond
->
lowerBnd
.
nType
!=
TSDB_DATA_TYPE_BINARY
)))
{
(
*
pResult
)
=
NULL
;
return
0
;
}
if
(
rel
==
0
)
{
/*
* 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd
* point query
*/
if
(
pQueryCond
->
lowerBndRelOptr
==
TSDB_RELATION_LARGE_EQUAL
&&
pQueryCond
->
upperBndRelOptr
==
TSDB_RELATION_LESS_EQUAL
)
{
// point query
return
tSkipListGets
(
pSkipList
,
&
pQueryCond
->
lowerBnd
,
pResult
);
}
else
{
(
*
pResult
)
=
NULL
;
return
0
;
}
}
else
{
/* range query, query operation code check */
return
tSkipListRangeQuery
(
pSkipList
,
pQueryCond
,
pResult
);
}
}
typedef
struct
MultipleQueryResult
{
int32_t
len
;
tSkipListNode
**
pData
;
}
MultipleQueryResult
;
static
int32_t
mergeQueryResult
(
MultipleQueryResult
*
pResults
,
int32_t
numOfResSet
,
tSkipListNode
***
pRes
)
{
int32_t
total
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfResSet
;
++
i
)
{
total
+=
pResults
[
i
].
len
;
}
(
*
pRes
)
=
malloc
(
POINTER_BYTES
*
total
);
int32_t
idx
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfResSet
;
++
i
)
{
MultipleQueryResult
*
pOneResult
=
&
pResults
[
i
];
for
(
int32_t
j
=
0
;
j
<
pOneResult
->
len
;
++
j
)
{
(
*
pRes
)[
idx
++
]
=
pOneResult
->
pData
[
j
];
}
}
return
total
;
}
static
void
removeDuplicateKey
(
tSkipListKey
*
pKey
,
int32_t
*
numOfKey
,
__compar_fn_t
comparator
)
{
if
(
*
numOfKey
==
1
)
{
return
;
}
qsort
(
pKey
,
*
numOfKey
,
sizeof
(
pKey
[
0
]),
comparator
);
int32_t
i
=
0
,
j
=
1
;
while
(
i
<
(
*
numOfKey
)
&&
j
<
(
*
numOfKey
))
{
int32_t
ret
=
comparator
(
&
pKey
[
i
],
&
pKey
[
j
]);
if
(
ret
==
0
)
{
j
++
;
}
else
{
pKey
[
i
+
1
]
=
pKey
[
j
];
i
++
;
j
++
;
}
}
(
*
numOfKey
)
=
i
+
1
;
}
int32_t
mergeResult
(
const
tSkipListKey
*
pKey
,
int32_t
numOfKey
,
tSkipListNode
***
pRes
,
__compar_fn_t
comparator
,
tSkipListNode
*
pNode
)
{
int32_t
i
=
0
,
j
=
0
;
// merge two sorted arrays in O(n) time
while
(
i
<
numOfKey
&&
pNode
!=
NULL
)
{
int32_t
ret
=
comparator
(
&
pNode
->
key
,
&
pKey
[
i
]);
if
(
ret
<
0
)
{
(
*
pRes
)[
j
++
]
=
pNode
;
pNode
=
pNode
->
pForward
[
0
];
}
else
if
(
ret
==
0
)
{
pNode
=
pNode
->
pForward
[
0
];
}
else
{
// pNode->key > pkey[i]
i
++
;
}
}
while
(
pNode
!=
NULL
)
{
(
*
pRes
)[
j
++
]
=
pNode
;
pNode
=
pNode
->
pForward
[
0
];
}
return
j
;
}
int32_t
tSkipListPointQuery
(
tSkipList
*
pSkipList
,
tSkipListKey
*
pKey
,
int32_t
numOfKey
,
tSkipListPointQueryType
type
,
tSkipListNode
***
pRes
)
{
if
(
numOfKey
==
0
||
pKey
==
NULL
||
pSkipList
==
NULL
||
pSkipList
->
nSize
==
0
||
(
type
!=
INCLUDE_POINT_QUERY
&&
type
!=
EXCLUDE_POINT_QUERY
))
{
(
*
pRes
)
=
NULL
;
return
0
;
}
__compar_fn_t
comparator
=
getKeyComparator
(
pKey
->
nType
);
removeDuplicateKey
(
pKey
,
&
numOfKey
,
comparator
);
if
(
type
==
INCLUDE_POINT_QUERY
)
{
if
(
numOfKey
==
1
)
{
return
tSkipListGets
(
pSkipList
,
&
pKey
[
0
],
pRes
);
}
else
{
MultipleQueryResult
*
pTempResult
=
(
MultipleQueryResult
*
)
malloc
(
sizeof
(
MultipleQueryResult
)
*
numOfKey
);
for
(
int32_t
i
=
0
;
i
<
numOfKey
;
++
i
)
{
pTempResult
[
i
].
len
=
tSkipListGets
(
pSkipList
,
&
pKey
[
i
],
&
pTempResult
[
i
].
pData
);
}
int32_t
num
=
mergeQueryResult
(
pTempResult
,
numOfKey
,
pRes
);
for
(
int32_t
i
=
0
;
i
<
numOfKey
;
++
i
)
{
free
(
pTempResult
[
i
].
pData
);
}
free
(
pTempResult
);
return
num
;
}
}
else
{
// exclude query
*
pRes
=
malloc
(
POINTER_BYTES
*
pSkipList
->
nSize
);
__compar_fn_t
filterComparator
=
getKeyFilterComparator
(
pSkipList
,
pKey
->
nType
);
tSkipListNode
*
pNode
=
pSkipList
->
pHead
.
pForward
[
0
];
int32_t
retLen
=
mergeResult
(
pKey
,
numOfKey
,
pRes
,
filterComparator
,
pNode
);
if
(
retLen
<
pSkipList
->
nSize
)
{
(
*
pRes
)
=
realloc
(
*
pRes
,
POINTER_BYTES
*
retLen
);
}
return
retLen
;
p
=
SL_GET_FORWARD_POINTER
(
p
,
nlevel
-
1
);
// p = p->pForward[nlevel - 1];
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录