Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6a92a4c3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6a92a4c3
编写于
12月 28, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 28, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9436 from taosdata/feature/index_cache
refactor index write
上级
ab75e61b
80718aee
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
38 addition
and
20 deletion
+38
-20
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+2
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+1
-3
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+35
-16
未找到文件。
source/libs/index/inc/index_cache.h
浏览文件 @
6a92a4c3
...
...
@@ -39,6 +39,7 @@ typedef struct IndexCache {
int32_t
nTerm
;
int8_t
type
;
pthread_mutex_t
mtx
;
}
IndexCache
;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
...
...
@@ -71,7 +72,7 @@ void indexCacheUnRef(IndexCache* cache);
void
indexCacheDebug
(
IndexCache
*
cache
);
void
indexCacheDestroy
Skiplist
(
SSkipList
*
slt
);
void
indexCacheDestroy
Imm
(
IndexCache
*
cache
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/index/src/index.c
浏览文件 @
6a92a4c3
...
...
@@ -436,9 +436,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if
(
ret
!=
0
)
{
indexError
(
"faile to write into tindex "
);
}
}
// not free later, just put int table cache
SSkipList
*
timm
=
(
SSkipList
*
)
pCache
->
imm
;
pCache
->
imm
=
NULL
;
// or throw int bg thread
indexCacheDestroySkiplist
(
timm
);
indexCacheDestroyImm
(
pCache
);
tfileWriteClose
(
tw
);
indexCacheIteratorDestroy
(
cacheIter
);
...
...
source/libs/index/src/index_cache.c
浏览文件 @
6a92a4c3
...
...
@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define
CACH
_LIMIT 1000000
#define
MEM_TERM
_LIMIT 1000000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
...
...
@@ -78,6 +78,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
cache
->
index
=
idx
;
cache
->
version
=
0
;
pthread_mutex_init
(
&
cache
->
mtx
,
NULL
);
indexCacheRef
(
cache
);
return
cache
;
}
...
...
@@ -103,13 +104,21 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
}
tSkipListDestroyIter
(
iter
);
}
void
indexCacheDestroyImm
(
IndexCache
*
cache
)
{
pthread_mutex_lock
(
&
cache
->
mtx
);
SSkipList
*
timm
=
(
SSkipList
*
)
cache
->
imm
;
cache
->
imm
=
NULL
;
// or throw int bg thread
pthread_mutex_unlock
(
&
cache
->
mtx
);
indexCacheDestroySkiplist
(
timm
);
}
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
pCache
->
mem
);
tSkipListDestroy
(
pCache
->
imm
);
free
(
pCache
->
colName
);
free
(
pCache
);
}
...
...
@@ -170,6 +179,27 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
taosScheduleTask
(
indexQhandle
,
&
schedMsg
);
}
static
void
indexCacheMakeRoomForWrite
(
IndexCache
*
cache
)
{
while
(
true
)
{
if
(
cache
->
nTerm
<
MEM_TERM_LIMIT
)
{
cache
->
nTerm
+=
1
;
break
;
}
else
if
(
cache
->
imm
!=
NULL
)
{
// TODO: wake up by condition variable
pthread_mutex_unlock
(
&
cache
->
mtx
);
taosMsleep
(
50
);
pthread_mutex_lock
(
&
cache
->
mtx
);
}
else
{
cache
->
imm
=
cache
->
mem
;
cache
->
mem
=
indexInternalCacheCreate
(
cache
->
type
);
cache
->
nTerm
=
1
;
// sched to merge
// unref cache in bgwork
indexCacheSchedToMerge
(
cache
);
}
}
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
...
...
@@ -188,23 +218,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
ct
->
uid
=
uid
;
ct
->
operaType
=
term
->
operType
;
// ugly code, refactor later
pthread_mutex_lock
(
&
pCache
->
mtx
);
indexCacheMakeRoomForWrite
(
pCache
);
tSkipListPut
(
pCache
->
mem
,
(
char
*
)
ct
);
pCache
->
nTerm
+=
1
;
if
(
pCache
->
nTerm
>=
CACH_LIMIT
)
{
pCache
->
nTerm
=
0
;
while
(
pCache
->
imm
!=
NULL
)
{
// do nothong
}
pCache
->
imm
=
pCache
->
mem
;
pCache
->
mem
=
indexInternalCacheCreate
(
pCache
->
type
);
pthread_mutex_unlock
(
&
pCache
->
mtx
);
// sched to merge
// unref cache int bgwork
indexCacheSchedToMerge
(
pCache
);
}
indexCacheUnRef
(
pCache
);
return
0
;
// encode end
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录