Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9fe46510
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9fe46510
编写于
12月 27, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 27, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9396 from taosdata/origin/feature/index_cache
add flush-helper function
上级
408b1168
f99f6ec8
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
200 addition
and
154 deletion
+200
-154
include/libs/index/index.h
include/libs/index/index.h
+12
-0
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+28
-31
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+16
-9
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+4
-0
source/libs/index/src/index.c
source/libs/index/src/index.c
+46
-33
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+74
-64
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+14
-12
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+6
-5
未找到文件。
include/libs/index/index.h
浏览文件 @
9fe46510
...
...
@@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid,
int32_t
nColVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
/*
* init index
*
*/
int32_t
indexInit
();
/*
* destory index
*
*/
void
indexCleanUp
();
#ifdef __cplusplus
}
#endif
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
9fe46510
...
...
@@ -49,7 +49,6 @@ struct SIndex {
SHashObj
*
colObj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int
colId
;
// field id allocated to cache
int32_t
cVersion
;
// current version allocated to cache
SIndexStat
stat
;
...
...
@@ -88,41 +87,39 @@ typedef struct SIndexTermQuery {
EIndexQueryType
qType
;
}
SIndexTermQuery
;
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
typedef
struct
Iterate
{
void
*
iter
;
int8_t
type
;
char
*
colVal
;
SArray
*
val
;
}
Iterate
;
extern
void
*
indexQhandle
;
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \
} while (0)
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \
} while (0)
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \
} while (0)
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
} while (0)
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
} while (0)
#ifdef __cplusplus
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
9fe46510
...
...
@@ -22,10 +22,8 @@
// ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->|
* |<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|
* <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->|
* content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/
#ifdef __cplusplus
...
...
@@ -34,12 +32,17 @@ extern "C" {
typedef
struct
IndexCache
{
T_REF_DECLARE
()
SSkipList
*
skiplist
;
SSkipList
*
mem
,
*
imm
;
SIndex
*
index
;
char
*
colName
;
int32_t
version
;
int32_t
nTerm
;
int8_t
type
;
}
IndexCache
;
typedef
struct
CacheTerm
{
// key
int32_t
colId
;
int32_t
nColVal
;
char
*
colVal
;
int32_t
version
;
...
...
@@ -49,14 +52,18 @@ typedef struct CacheTerm {
SIndexOperOnColumn
operaType
;
}
CacheTerm
;
//
IndexCache
*
indexCacheCreate
();
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
);
void
indexCacheDestroy
(
void
*
cache
);
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
);
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
);
// int indexCacheGet(void *cache, uint64_t *rst);
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
);
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
);
void
indexCacheRef
(
IndexCache
*
cache
);
void
indexCacheUnRef
(
IndexCache
*
cache
);
void
indexCacheDebug
(
IndexCache
*
cache
);
#ifdef __cplusplus
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
9fe46510
...
...
@@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
);
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
);
void
tfileReaderDestroy
(
TFileReader
*
reader
);
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
);
void
tfileReaderRef
(
TFileReader
*
reader
);
void
tfileReaderUnRef
(
TFileReader
*
reader
);
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
);
void
tfileWriterDestroy
(
TFileWriter
*
tw
);
...
...
source/libs/index/src/index.c
浏览文件 @
9fe46510
...
...
@@ -18,11 +18,26 @@
#include "index_cache.h"
#include "index_tfile.h"
#include "tdef.h"
#include "tsched.h"
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
#endif
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4
void
*
indexQhandle
=
NULL
;
int32_t
indexInit
()
{
indexQhandle
=
taosInitScheduler
(
INDEX_QUEUE_SIZE
,
INDEX_NUM_OF_THREADS
,
"index"
);
return
indexQhandle
==
NULL
?
-
1
:
0
;
// do nothing
}
void
indexCleanUp
()
{
taosCleanUpScheduler
(
indexQhandle
);
}
static
int
uidCompare
(
const
void
*
a
,
const
void
*
b
)
{
uint64_t
u1
=
*
(
uint64_t
*
)
a
;
uint64_t
u2
=
*
(
uint64_t
*
)
b
;
...
...
@@ -38,16 +53,15 @@ typedef struct SIdxColInfo {
}
SIdxColInfo
;
static
pthread_once_t
isInit
=
PTHREAD_ONCE_INIT
;
static
void
indexInit
();
//
static void indexInit();
static
int
indexTermSearch
(
SIndex
*
sIdx
,
SIndexTermQuery
*
term
,
SArray
**
result
);
static
int
indexFlushCacheTFile
(
SIndex
*
sIdx
);
static
void
indexInterResultsDestroy
(
SArray
*
results
);
static
int
indexMergeFinalResults
(
SArray
*
interResults
,
EIndexOperatorType
oType
,
SArray
*
finalResult
);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
//
pthread_once(&isInit, indexInit);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
...
...
@@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif
#ifdef USE_INVERTED_INDEX
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
(
);
sIdx
->
tindex
=
NULL
;
// sIdx->cache = (void*)indexCacheCreate(sIdx
);
sIdx
->
tindex
=
indexTFileCreate
(
path
)
;
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
colId
=
1
;
sIdx
->
cVersion
=
1
;
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
...
...
@@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) {
#ifdef USE_INVERTED_INDEX
indexCacheDestroy
(
sIdx
->
cache
);
void
*
iter
=
taosHashIterate
(
sIdx
->
colObj
,
NULL
);
while
(
iter
)
{
IndexCache
**
pCache
=
iter
;
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
iter
=
taosHashIterate
(
sIdx
->
colObj
,
iter
);
}
taosHashCleanup
(
sIdx
->
colObj
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
#endif
...
...
@@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
fi
==
NULL
)
{
SIdxColInfo
tfi
=
{.
colId
=
index
->
colId
};
index
->
cVersion
++
;
index
->
colId
++
;
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
tfi
,
sizeof
(
tfi
));
}
else
{
// TODO, del
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
*
cache
==
NULL
)
{
IndexCache
*
pCache
=
indexCacheCreate
(
index
,
p
->
colName
,
p
->
colType
);
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
pCache
,
sizeof
(
void
*
));
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
fi
!=
NULL
);
int32_t
colId
=
fi
->
colId
;
int32_t
version
=
index
->
cVersion
;
int
ret
=
indexCachePut
(
index
->
cache
,
p
,
colId
,
version
,
uid
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
*
cache
!=
NULL
);
int
ret
=
indexCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
}
#endif
#endif
return
0
;
}
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
...
...
@@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) {
taosArrayDestroy
(
terms
);
}
void
indexInit
()
{
// do nothing
}
static
int
indexTermSearch
(
SIndex
*
sIdx
,
SIndexTermQuery
*
query
,
SArray
**
result
)
{
int32_t
version
=
-
1
;
int16_t
colId
=
-
1
;
SIdxColInfo
*
colInfo
=
NULL
;
SIndexTerm
*
term
=
query
->
term
;
const
char
*
colName
=
term
->
colName
;
int32_t
nColName
=
term
->
nColName
;
// Get col info
IndexCache
*
cache
=
NULL
;
pthread_mutex_lock
(
&
sIdx
->
mtx
);
colInfo
=
taosHashGet
(
sIdx
->
colObj
,
colName
,
nColName
);
if
(
colInfo
==
NULL
)
{
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
colName
,
nColName
);
if
(
*
pCache
==
NULL
)
{
pthread_mutex_unlock
(
&
sIdx
->
mtx
);
return
-
1
;
}
colId
=
colInfo
->
colId
;
version
=
colInfo
->
cVersion
;
cache
=
*
pCache
;
pthread_mutex_unlock
(
&
sIdx
->
mtx
);
*
result
=
taosArrayInit
(
4
,
sizeof
(
uint64_t
));
// TODO: iterator mem and tidex
STermValueType
s
;
if
(
0
==
indexCacheSearch
(
sIdx
->
cache
,
query
,
colId
,
version
,
*
result
,
&
s
))
{
if
(
0
==
indexCacheSearch
(
cache
,
query
,
*
result
,
&
s
))
{
if
(
s
==
kTypeDeletion
)
{
indexInfo
(
"col: %s already drop by other opera"
,
term
->
colName
);
// coloum already drop by other oper, no need to query tindex
...
...
@@ -353,9 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
return
0
;
}
static
int
indexFlushCacheTFile
(
SIndex
*
sIdx
)
{
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
cache
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
colName
);
tfileReaderUnRef
(
pReader
);
indexCacheUnRef
(
pCache
);
return
0
;
}
source/libs/index/src/index_cache.c
浏览文件 @
9fe46510
...
...
@@ -16,9 +16,11 @@
#include "index_cache.h"
#include "index_util.h"
#include "tcompare.h"
#include "tsched.h"
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define CACH_LIMIT 1000000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
...
...
@@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) {
CacheTerm
*
lt
=
(
CacheTerm
*
)
l
;
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
// compare colId
if
(
lt
->
colId
!=
rt
->
colId
)
{
return
lt
->
colId
-
rt
->
colId
;
}
// compare colVal
int
i
,
j
;
for
(
i
=
0
,
j
=
0
;
i
<
lt
->
nColVal
&&
j
<
rt
->
nColVal
;
i
++
,
j
++
)
{
...
...
@@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) {
return
-
1
;
}
// compare version
return
rt
->
version
-
lt
->
version
;
}
// char* lp = (char*)l;
// char* rp = (char*)r;
//// compare col id
// int16_t lf, rf; // cold id
// memcpy(&lf, lp, sizeof(lf));
// memcpy(&rf, rp, sizeof(rf));
// if (lf != rf) { return lf < rf ? -1 : 1; }
// lp += sizeof(lf);
// rp += sizeof(rf);
//// skip value len
// int32_t lfl, rfl;
// memcpy(&lfl, lp, sizeof(lfl));
// memcpy(&rfl, rp, sizeof(rfl));
// lp += sizeof(lfl);
// rp += sizeof(rfl);
//// compare value
// int32_t i, j;
// for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
// if (lp[i] == rp[j]) {
// continue;
// } else {
// return lp[i] < rp[j] ? -1 : 1;
// }
//}
// if (i < lfl) {
// return 1;
//} else if (j < rfl) {
// return -1;
//}
// lp += lfl;
// rp += rfl;
//// compare version, desc order
// int32_t lv, rv;
// memcpy(&lv, lp, sizeof(lv));
// memcpy(&rv, rp, sizeof(rv));
// if (lv != rv) { return lv < rv ? 1 : -1; }
// return 0;
static
SSkipList
*
indexInternalCacheCreate
(
int8_t
type
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
)
{
return
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
type
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
}
}
IndexCache
*
indexCacheCreate
()
{
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
cache
=
calloc
(
1
,
sizeof
(
IndexCache
));
if
(
cache
==
NULL
)
{
indexError
(
"failed to create index cache"
);
return
NULL
;
}
cache
->
skiplist
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
};
cache
->
mem
=
indexInternalCacheCreate
(
type
);
cache
->
colName
=
calloc
(
1
,
strlen
(
colName
)
+
1
);
memcpy
(
cache
->
colName
,
colName
,
strlen
(
colName
));
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
indexCacheRef
(
cache
);
return
cache
;
}
void
indexCacheDebug
(
IndexCache
*
cache
)
{
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
cache
->
skiplist
);
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
cache
->
mem
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
CacheTerm
*
ct
=
(
CacheTerm
*
)
SL_GET_NODE_DATA
(
node
);
if
(
ct
!=
NULL
)
{
// TODO, add more debug info
indexInfo
(
"{col
Id:%d, colVal: %s, version: %d}
\t
"
,
ct
->
colId
,
ct
->
colVal
,
ct
->
version
);
indexInfo
(
"{col
Val: %s, version: %d}
\t
"
,
ct
->
colVal
,
ct
->
version
);
}
}
tSkipListDestroyIter
(
iter
);
...
...
@@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) {
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
pCache
->
skiplist
);
tSkipListDestroy
(
pCache
->
mem
);
tSkipListDestroy
(
pCache
->
imm
);
free
(
pCache
->
colName
);
free
(
pCache
);
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
static
void
doMergeWork
(
SSchedMsg
*
msg
)
{
IndexCache
*
pCache
=
msg
->
ahandle
;
SIndex
*
sidx
=
(
SIndex
*
)
pCache
->
index
;
indexFlushCacheTFile
(
sidx
,
pCache
);
}
int
indexCacheSchedToMerge
(
IndexCache
*
pCache
)
{
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
doMergeWork
;
schedMsg
.
ahandle
=
pCache
;
schedMsg
.
thandle
=
NULL
;
schedMsg
.
msg
=
NULL
;
taosScheduleTask
(
indexQhandle
,
&
schedMsg
);
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
indexCacheRef
(
pCache
);
// encode data
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
cache
==
NULL
)
{
return
-
1
;
}
// set up key
ct
->
colId
=
colId
;
ct
->
colType
=
term
->
colType
;
ct
->
nColVal
=
term
->
nColVal
;
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
ct
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
ct
->
nColVal
);
ct
->
version
=
version
;
ct
->
version
=
atomic_add_fetch_32
(
&
pCache
->
version
,
1
)
;
// set value
ct
->
uid
=
uid
;
ct
->
operaType
=
term
->
operType
;
tSkipListPut
(
pCache
->
skiplist
,
(
char
*
)
ct
);
tSkipListPut
(
pCache
->
mem
,
(
char
*
)
ct
);
pCache
->
nTerm
+=
1
;
if
(
pCache
->
nTerm
>=
CACH_LIMIT
)
{
pCache
->
nTerm
=
0
;
while
(
pCache
->
imm
!=
NULL
)
{
// do nothong
}
pCache
->
imm
=
pCache
->
mem
;
pCache
->
mem
=
indexInternalCacheCreate
(
pCache
->
type
);
// sched to merge
// unref cache int bgwork
indexCacheSchedToMerge
(
pCache
);
}
indexCacheUnRef
(
pCache
);
return
0
;
// encode end
}
int
indexCacheDel
(
void
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
indexCacheDel
(
void
*
cache
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
IndexCache
*
pCache
=
cache
;
return
0
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
SIndexTerm
*
term
=
query
->
term
;
...
...
@@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
ct
==
NULL
)
{
return
-
1
;
}
ct
->
colId
=
colId
;
ct
->
nColVal
=
term
->
nColVal
;
ct
->
colVal
=
calloc
(
1
,
sizeof
(
char
)
*
(
ct
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
ct
->
nColVal
);
ct
->
version
=
version
;
ct
->
version
=
atomic_load_32
(
&
pCache
->
version
)
;
char
*
key
=
getIndexKey
(
ct
);
// TODO handle multi situation later, and refactor
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
pCache
->
skiplist
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
pCache
->
mem
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
if
(
node
!=
NULL
)
{
...
...
@@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
}
return
0
;
}
void
indexCacheRef
(
IndexCache
*
cache
)
{
int
ref
=
T_REF_INC
(
cache
);
UNUSED
(
ref
);
}
void
indexCacheUnRef
(
IndexCache
*
cache
)
{
int
ref
=
T_REF_DEC
(
cache
);
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
}
source/libs/index/src/index_tfile.c
浏览文件 @
9fe46510
...
...
@@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer);
static
int
tfileWriteFstOffset
(
TFileWriter
*
tw
,
int32_t
offset
);
static
int
tfileWriteData
(
TFileWriter
*
write
,
TFileValue
*
tval
);
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
);
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
void
tfileReaderRef
(
TFileReader
*
reader
);
static
void
tfileReaderUnRef
(
TFileReader
*
reader
);
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
);
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
);
static
int
tfileRmExpireFile
(
SArray
*
result
);
...
...
@@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
return
;
}
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
TFileReader
*
reader
=
calloc
(
1
,
sizeof
(
TFileReader
));
if
(
reader
==
NULL
)
{
return
NULL
;
}
...
...
@@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return
0
;
}
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
)
{
if
(
tf
==
NULL
)
{
return
NULL
;
}
TFileCacheKey
key
=
{.
suid
=
0
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
return
tfileCacheGet
(
tf
->
cache
,
&
key
);
}
static
int
tfileStrCompare
(
const
void
*
a
,
const
void
*
b
)
{
int
ret
=
strcmp
((
char
*
)
a
,
(
char
*
)
b
);
...
...
@@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
free
(
buf
);
return
0
;
}
static
void
tfileReaderRef
(
TFileReader
*
reader
)
{
void
tfileReaderRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_INC
(
reader
);
UNUSED
(
ref
);
}
static
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_DEC
(
reader
);
if
(
ref
==
0
)
{
tfileReaderDestroy
(
reader
);
}
}
...
...
@@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
return
-
1
;
}
static
void
tfileSerialCacheKey
(
TFileCacheKey
*
key
,
char
*
buf
)
{
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
colType
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
//
SERIALIZE_MEM_TO_BUF(buf, key, suid);
//
SERIALIZE_VAR_TO_BUF(buf, '_', char);
//
SERIALIZE_MEM_TO_BUF(buf, key, colType);
//
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
}
source/libs/index/test/indexTests.cc
浏览文件 @
9fe46510
...
...
@@ -471,10 +471,10 @@ class CacheObj {
public:
CacheObj
()
{
// TODO
cache
=
indexCacheCreate
();
cache
=
indexCacheCreate
(
NULL
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
}
int
Put
(
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
int
ret
=
indexCachePut
(
cache
,
term
,
colId
,
version
,
uid
);
int
ret
=
indexCachePut
(
cache
,
term
,
uid
);
if
(
ret
!=
0
)
{
//
std
::
cout
<<
"failed to put into cache: "
<<
ret
<<
std
::
endl
;
...
...
@@ -486,7 +486,7 @@ class CacheObj {
indexCacheDebug
(
cache
);
}
int
Get
(
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
int
ret
=
indexCacheSearch
(
cache
,
query
,
colId
,
version
,
result
,
s
);
int
ret
=
indexCacheSearch
(
cache
,
query
,
result
,
s
);
if
(
ret
!=
0
)
{
//
std
::
cout
<<
"failed to get from cache:"
<<
ret
<<
std
::
endl
;
...
...
@@ -561,7 +561,7 @@ TEST_F(IndexCacheEnv, cache_test) {
}
{
std
::
string
colVal
(
"v4"
);
for
(
size_t
i
=
0
;
i
<
10
0
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
10
;
i
++
)
{
colVal
[
colVal
.
size
()
-
1
]
=
'a'
+
i
;
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
...
...
@@ -578,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) {
STermValueType
valType
;
coj
->
Get
(
&
query
,
colId
,
10000
,
ret
,
&
valType
);
assert
(
taosArrayGetSize
(
ret
)
==
3
);
// std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert
(
taosArrayGetSize
(
ret
)
==
4
);
}
{
std
::
string
colVal
(
"v2"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录