Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
511c39a0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
511c39a0
编写于
7月 13, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add skiplist entry cache
上级
e91ca30b
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
633 addition
and
221 deletion
+633
-221
source/libs/sync/inc/syncRaftEntry.h
source/libs/sync/inc/syncRaftEntry.h
+39
-15
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+258
-16
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+1
-1
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+14
-0
source/libs/sync/test/syncEntryCacheTest.cpp
source/libs/sync/test/syncEntryCacheTest.cpp
+44
-189
source/libs/sync/test/syncHashCacheTest.cpp
source/libs/sync/test/syncHashCacheTest.cpp
+277
-0
未找到文件。
source/libs/sync/inc/syncRaftEntry.h
浏览文件 @
511c39a0
...
@@ -26,6 +26,7 @@ extern "C" {
...
@@ -26,6 +26,7 @@ extern "C" {
#include "syncInt.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncMessage.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tskiplist.h"
typedef
struct
SSyncRaftEntry
{
typedef
struct
SSyncRaftEntry
{
uint32_t
bytes
;
uint32_t
bytes
;
...
@@ -58,29 +59,52 @@ void syncEntryLog(const SSyncRaftEntry* pObj);
...
@@ -58,29 +59,52 @@ void syncEntryLog(const SSyncRaftEntry* pObj);
void
syncEntryLog2
(
char
*
s
,
const
SSyncRaftEntry
*
pObj
);
void
syncEntryLog2
(
char
*
s
,
const
SSyncRaftEntry
*
pObj
);
//-----------------------------------
//-----------------------------------
typedef
struct
SRaftEntryCache
{
typedef
struct
SRaftEntry
Hash
Cache
{
SHashObj
*
pEntryHash
;
SHashObj
*
pEntryHash
;
int32_t
maxCount
;
int32_t
maxCount
;
int32_t
currentCount
;
int32_t
currentCount
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
SSyncNode
*
pSyncNode
;
SSyncNode
*
pSyncNode
;
}
SRaftEntryHashCache
;
SRaftEntryHashCache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
);
void
raftCacheDestroy
(
SRaftEntryHashCache
*
pCache
);
int32_t
raftCachePutEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SSyncRaftEntry
*
pEntry
);
int32_t
raftCacheGetEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheGetEntryP
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheDelEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
);
int32_t
raftCacheGetAndDel
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheClear
(
struct
SRaftEntryHashCache
*
pCache
);
cJSON
*
raftCache2Json
(
SRaftEntryHashCache
*
pObj
);
char
*
raftCache2Str
(
SRaftEntryHashCache
*
pObj
);
void
raftCachePrint
(
SRaftEntryHashCache
*
pObj
);
void
raftCachePrint2
(
char
*
s
,
SRaftEntryHashCache
*
pObj
);
void
raftCacheLog
(
SRaftEntryHashCache
*
pObj
);
void
raftCacheLog2
(
char
*
s
,
SRaftEntryHashCache
*
pObj
);
//-----------------------------------
typedef
struct
SRaftEntryCache
{
SSkipList
*
pSkipList
;
int32_t
maxCount
;
int32_t
currentCount
;
TdThreadMutex
mutex
;
SSyncNode
*
pSyncNode
;
}
SRaftEntryCache
;
}
SRaftEntryCache
;
SRaftEntryCache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
);
SRaftEntryCache
*
raftEntryCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
);
void
raftCacheDestroy
(
SRaftEntryCache
*
pCache
);
void
raftEntryCacheDestroy
(
SRaftEntryCache
*
pCache
);
int32_t
raftCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
);
int32_t
raftEntryCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
);
int32_t
raftCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftEntryCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftEntryCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheDelEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
);
int32_t
raftEntryCacheClear
(
struct
SRaftEntryCache
*
pCache
,
int32_t
count
);
int32_t
raftCacheGetAndDel
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheClear
(
struct
SRaftEntryCache
*
pCache
);
cJSON
*
raftCache2Json
(
SRaftEntryCache
*
pObj
);
cJSON
*
raft
Entry
Cache2Json
(
SRaftEntryCache
*
pObj
);
char
*
raftCache2Str
(
SRaftEntryCache
*
pObj
);
char
*
raft
Entry
Cache2Str
(
SRaftEntryCache
*
pObj
);
void
raftCachePrint
(
SRaftEntryCache
*
pObj
);
void
raft
Entry
CachePrint
(
SRaftEntryCache
*
pObj
);
void
raftCachePrint2
(
char
*
s
,
SRaftEntryCache
*
pObj
);
void
raft
Entry
CachePrint2
(
char
*
s
,
SRaftEntryCache
*
pObj
);
void
raftCacheLog
(
SRaftEntryCache
*
pObj
);
void
raft
Entry
CacheLog
(
SRaftEntryCache
*
pObj
);
void
raftCacheLog2
(
char
*
s
,
SRaftEntryCache
*
pObj
);
void
raft
Entry
CacheLog2
(
char
*
s
,
SRaftEntryCache
*
pObj
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
511c39a0
...
@@ -198,8 +198,8 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
...
@@ -198,8 +198,8 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
}
}
//-----------------------------------
//-----------------------------------
SRaftEntryCache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
SRaftEntry
Hash
Cache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
SRaftEntry
Cache
*
pCache
=
taosMemoryMalloc
(
sizeof
(
SRaftEntry
Cache
));
SRaftEntry
HashCache
*
pCache
=
taosMemoryMalloc
(
sizeof
(
SRaftEntryHash
Cache
));
if
(
pCache
==
NULL
)
{
if
(
pCache
==
NULL
)
{
sError
(
"vgId:%d raft cache create error"
,
pSyncNode
->
vgId
);
sError
(
"vgId:%d raft cache create error"
,
pSyncNode
->
vgId
);
return
NULL
;
return
NULL
;
...
@@ -220,7 +220,7 @@ SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
...
@@ -220,7 +220,7 @@ SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
return
pCache
;
return
pCache
;
}
}
void
raftCacheDestroy
(
SRaftEntryCache
*
pCache
)
{
void
raftCacheDestroy
(
SRaftEntry
Hash
Cache
*
pCache
)
{
if
(
pCache
!=
NULL
)
{
if
(
pCache
!=
NULL
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosHashCleanup
(
pCache
->
pEntryHash
);
taosHashCleanup
(
pCache
->
pEntryHash
);
...
@@ -233,7 +233,7 @@ void raftCacheDestroy(SRaftEntryCache* pCache) {
...
@@ -233,7 +233,7 @@ void raftCacheDestroy(SRaftEntryCache* pCache) {
// success, return 1
// success, return 1
// max count, return 0
// max count, return 0
// error, return -1
// error, return -1
int32_t
raftCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
)
{
int32_t
raftCachePutEntry
(
struct
SRaftEntry
Hash
Cache
*
pCache
,
SSyncRaftEntry
*
pEntry
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
if
(
pCache
->
currentCount
>=
pCache
->
maxCount
)
{
if
(
pCache
->
currentCount
>=
pCache
->
maxCount
)
{
...
@@ -259,7 +259,7 @@ int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry
...
@@ -259,7 +259,7 @@ int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry
// success, return 0
// success, return 0
// error, return -1
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
int32_t
raftCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
int32_t
raftCacheGetEntry
(
struct
SRaftEntry
Hash
Cache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -292,7 +292,7 @@ int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSync
...
@@ -292,7 +292,7 @@ int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSync
// success, return 0
// success, return 0
// error, return -1
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
int32_t
raftCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
int32_t
raftCacheGetEntryP
(
struct
SRaftEntry
Hash
Cache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -321,7 +321,7 @@ int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
...
@@ -321,7 +321,7 @@ int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
return
-
1
;
return
-
1
;
}
}
int32_t
raftCacheDelEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
)
{
int32_t
raftCacheDelEntry
(
struct
SRaftEntry
Hash
Cache
*
pCache
,
SyncIndex
index
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosHashRemove
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
taosHashRemove
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
--
(
pCache
->
currentCount
);
--
(
pCache
->
currentCount
);
...
@@ -329,7 +329,7 @@ int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) {
...
@@ -329,7 +329,7 @@ int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) {
return
0
;
return
0
;
}
}
int32_t
raftCacheGetAndDel
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
int32_t
raftCacheGetAndDel
(
struct
SRaftEntry
Hash
Cache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -362,7 +362,7 @@ int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
...
@@ -362,7 +362,7 @@ int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
return
-
1
;
return
-
1
;
}
}
int32_t
raftCacheClear
(
struct
SRaftEntryCache
*
pCache
)
{
int32_t
raftCacheClear
(
struct
SRaftEntry
Hash
Cache
*
pCache
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
taosHashClear
(
pCache
->
pEntryHash
);
taosHashClear
(
pCache
->
pEntryHash
);
pCache
->
currentCount
=
0
;
pCache
->
currentCount
=
0
;
...
@@ -371,7 +371,7 @@ int32_t raftCacheClear(struct SRaftEntryCache* pCache) {
...
@@ -371,7 +371,7 @@ int32_t raftCacheClear(struct SRaftEntryCache* pCache) {
}
}
//-----------------------------------
//-----------------------------------
cJSON
*
raftCache2Json
(
SRaftEntryCache
*
pCache
)
{
cJSON
*
raftCache2Json
(
SRaftEntry
Hash
Cache
*
pCache
)
{
char
u64buf
[
128
]
=
{
0
};
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
...
@@ -402,41 +402,283 @@ cJSON* raftCache2Json(SRaftEntryCache* pCache) {
...
@@ -402,41 +402,283 @@ cJSON* raftCache2Json(SRaftEntryCache* pCache) {
}
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SRaftEntryCache"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SRaftEntry
Hash
Cache"
,
pRoot
);
return
pJson
;
return
pJson
;
}
}
char
*
raftCache2Str
(
SRaftEntryCache
*
pCache
)
{
char
*
raftCache2Str
(
SRaftEntry
Hash
Cache
*
pCache
)
{
cJSON
*
pJson
=
raftCache2Json
(
pCache
);
cJSON
*
pJson
=
raftCache2Json
(
pCache
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
void
raftCachePrint
(
SRaftEntryCache
*
pCache
)
{
void
raftCachePrint
(
SRaftEntry
Hash
Cache
*
pCache
)
{
char
*
serialized
=
raftCache2Str
(
pCache
);
char
*
serialized
=
raftCache2Str
(
pCache
);
printf
(
"raftCachePrint | len:%"
PRIu64
" | %s
\n
"
,
strlen
(
serialized
),
serialized
);
printf
(
"raftCachePrint | len:%"
PRIu64
" | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
raftCachePrint2
(
char
*
s
,
SRaftEntryCache
*
pCache
)
{
void
raftCachePrint2
(
char
*
s
,
SRaftEntry
Hash
Cache
*
pCache
)
{
char
*
serialized
=
raftCache2Str
(
pCache
);
char
*
serialized
=
raftCache2Str
(
pCache
);
printf
(
"raftCachePrint2 | len:%"
PRIu64
" | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
printf
(
"raftCachePrint2 | len:%"
PRIu64
" | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
raftCacheLog
(
SRaftEntryCache
*
pCache
)
{
void
raftCacheLog
(
SRaftEntry
Hash
Cache
*
pCache
)
{
char
*
serialized
=
raftCache2Str
(
pCache
);
char
*
serialized
=
raftCache2Str
(
pCache
);
sTrace
(
"raftCacheLog | len:%"
PRIu64
" | %s"
,
strlen
(
serialized
),
serialized
);
sTrace
(
"raftCacheLog | len:%"
PRIu64
" | %s"
,
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
raftCacheLog2
(
char
*
s
,
SRaftEntryCache
*
pCache
)
{
void
raftCacheLog2
(
char
*
s
,
SRaftEntry
Hash
Cache
*
pCache
)
{
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
raftCache2Str
(
pCache
);
char
*
serialized
=
raftCache2Str
(
pCache
);
sTraceLong
(
"raftCacheLog2 | len:%"
PRIu64
" | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
sTraceLong
(
"raftCacheLog2 | len:%"
PRIu64
" | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
}
//-----------------------------------
static
char
*
keyFn
(
const
void
*
pData
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
pData
;
return
(
char
*
)(
&
(
pEntry
->
index
));
}
static
int
cmpFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
memcmp
(
p1
,
p2
,
sizeof
(
SyncIndex
));
}
SRaftEntryCache
*
raftEntryCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
SRaftEntryCache
*
pCache
=
taosMemoryMalloc
(
sizeof
(
SRaftEntryCache
));
if
(
pCache
==
NULL
)
{
sError
(
"vgId:%d raft cache create error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
pCache
->
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
if
(
pCache
->
pSkipList
==
NULL
)
{
sError
(
"vgId:%d raft cache create hash error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
taosThreadMutexInit
(
&
(
pCache
->
mutex
),
NULL
);
pCache
->
maxCount
=
maxCount
;
pCache
->
currentCount
=
0
;
pCache
->
pSyncNode
=
pSyncNode
;
return
pCache
;
}
void
raftEntryCacheDestroy
(
SRaftEntryCache
*
pCache
)
{
if
(
pCache
!=
NULL
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
tSkipListDestroy
(
pCache
->
pSkipList
);
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
taosThreadMutexDestroy
(
&
(
pCache
->
mutex
));
taosMemoryFree
(
pCache
);
}
}
// success, return 1
// max count, return 0
// error, return -1
int32_t
raftEntryCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
if
(
pCache
->
currentCount
>=
pCache
->
maxCount
)
{
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
return
0
;
}
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pCache
->
pSkipList
,
pEntry
);
ASSERT
(
pSkipListNode
!=
NULL
);
++
(
pCache
->
currentCount
);
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"raft cache add, type:%s,%d, type2:%s,%d, index:%"
PRId64
", bytes:%d"
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
,
pEntry
->
index
,
pEntry
->
bytes
);
syncNodeEventLog
(
pCache
->
pSyncNode
,
eventLog
);
}
while
(
0
);
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
return
1
;
}
// find one, return 1
// not found, return 0
// error, return -1
int32_t
raftEntryCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
ASSERT
(
ppEntry
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
raftEntryCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
if
(
code
==
1
)
{
*
ppEntry
=
taosMemoryMalloc
(
pEntry
->
bytes
);
memcpy
(
*
ppEntry
,
pEntry
,
pEntry
->
bytes
);
}
else
{
*
ppEntry
=
NULL
;
}
return
code
;
}
// find one, return 1
// not found, return 0
// error, return -1
int32_t
raftEntryCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
SyncIndex
index2
=
index
;
int32_t
code
=
0
;
SArray
*
entryPArray
=
tSkipListGet
(
pCache
->
pSkipList
,
(
char
*
)(
&
index2
));
int32_t
arraySize
=
taosArrayGetSize
(
entryPArray
);
if
(
arraySize
==
1
)
{
SSkipListNode
**
ppNode
=
(
SSkipListNode
**
)
taosArrayGet
(
entryPArray
,
0
);
ASSERT
(
*
ppNode
!=
NULL
);
*
ppEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
*
ppNode
);
code
=
1
;
}
else
if
(
arraySize
==
0
)
{
code
=
0
;
}
else
{
ASSERT
(
0
);
code
=
-
1
;
}
taosArrayDestroy
(
entryPArray
);
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
return
code
;
}
// count = -1, clear all
// count >= 0, clear count
// return -1, error
// return delete count
int32_t
raftEntryCacheClear
(
struct
SRaftEntryCache
*
pCache
,
int32_t
count
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
int32_t
returnCnt
=
0
;
if
(
count
==
-
1
)
{
// clear all
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pCache
->
pSkipList
);
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryDestory
(
pEntry
);
++
returnCnt
;
}
tSkipListDestroyIter
(
pIter
);
tSkipListDestroy
(
pCache
->
pSkipList
);
pCache
->
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
ASSERT
(
pCache
->
pSkipList
!=
NULL
);
}
else
{
// clear count
int
i
=
0
;
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pCache
->
pSkipList
);
SArray
*
delNodeArray
=
taosArrayInit
(
0
,
sizeof
(
SSkipListNode
*
));
// free entry
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
if
(
i
++
>=
count
)
{
break
;
}
// sDebug("push pNode:%p", pNode);
taosArrayPush
(
delNodeArray
,
&
pNode
);
++
returnCnt
;
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryDestory
(
pEntry
);
}
tSkipListDestroyIter
(
pIter
);
// delete skiplist node
int32_t
arraySize
=
taosArrayGetSize
(
delNodeArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSkipListNode
**
ppNode
=
taosArrayGet
(
delNodeArray
,
i
);
// sDebug("get pNode:%p", *ppNode);
tSkipListRemoveNode
(
pCache
->
pSkipList
,
*
ppNode
);
}
taosArrayDestroy
(
delNodeArray
);
}
pCache
->
currentCount
-=
returnCnt
;
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
return
returnCnt
;
}
cJSON
*
raftEntryCache2Json
(
SRaftEntryCache
*
pCache
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pCache
!=
NULL
)
{
taosThreadMutexLock
(
&
(
pCache
->
mutex
));
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pCache
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"currentCount"
,
pCache
->
currentCount
);
cJSON_AddNumberToObject
(
pRoot
,
"maxCount"
,
pCache
->
maxCount
);
cJSON
*
pEntries
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"entries"
,
pEntries
);
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pCache
->
pSkipList
);
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
cJSON_AddItemToArray
(
pEntries
,
syncEntry2Json
(
pEntry
));
}
tSkipListDestroyIter
(
pIter
);
taosThreadMutexUnlock
(
&
(
pCache
->
mutex
));
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SRaftEntryCache"
,
pRoot
);
return
pJson
;
}
char
*
raftEntryCache2Str
(
SRaftEntryCache
*
pObj
)
{
cJSON
*
pJson
=
raftEntryCache2Json
(
pObj
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
void
raftEntryCachePrint
(
SRaftEntryCache
*
pObj
)
{
char
*
serialized
=
raftEntryCache2Str
(
pObj
);
printf
(
"raftEntryCachePrint | len:%"
PRIu64
" | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
raftEntryCachePrint2
(
char
*
s
,
SRaftEntryCache
*
pObj
)
{
char
*
serialized
=
raftEntryCache2Str
(
pObj
);
printf
(
"raftEntryCachePrint2 | len:%"
PRIu64
" | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
raftEntryCacheLog
(
SRaftEntryCache
*
pObj
)
{
char
*
serialized
=
raftEntryCache2Str
(
pObj
);
sTrace
(
"raftEntryCacheLog | len:%"
PRIu64
" | %s"
,
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
raftEntryCacheLog2
(
char
*
s
,
SRaftEntryCache
*
pObj
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
raftEntryCache2Str
(
pObj
);
sTraceLong
(
"raftEntryCacheLog2 | len:%"
PRIu64
" | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
}
\ No newline at end of file
source/libs/sync/src/syncRespMgr.c
浏览文件 @
511c39a0
...
@@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
...
@@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while
(
pStub
)
{
while
(
pStub
)
{
size_t
len
;
size_t
len
;
void
*
key
=
taosHashGetKey
(
pStub
,
&
len
);
void
*
key
=
taosHashGetKey
(
pStub
,
&
len
);
uint64_t
*
pSeqNum
=
(
uint64_t
*
)
key
;
uint64_t
*
pSeqNum
=
(
uint64_t
*
)
key
;
int64_t
nowMS
=
taosGetTimestampMs
();
int64_t
nowMS
=
taosGetTimestampMs
();
...
...
source/libs/sync/test/CMakeLists.txt
浏览文件 @
511c39a0
...
@@ -18,6 +18,7 @@ add_executable(syncIndexMgrTest "")
...
@@ -18,6 +18,7 @@ add_executable(syncIndexMgrTest "")
add_executable
(
syncLogStoreTest
""
)
add_executable
(
syncLogStoreTest
""
)
add_executable
(
syncEntryTest
""
)
add_executable
(
syncEntryTest
""
)
add_executable
(
syncEntryCacheTest
""
)
add_executable
(
syncEntryCacheTest
""
)
add_executable
(
syncHashCacheTest
""
)
add_executable
(
syncRequestVoteTest
""
)
add_executable
(
syncRequestVoteTest
""
)
add_executable
(
syncRequestVoteReplyTest
""
)
add_executable
(
syncRequestVoteReplyTest
""
)
add_executable
(
syncAppendEntriesTest
""
)
add_executable
(
syncAppendEntriesTest
""
)
...
@@ -137,6 +138,10 @@ target_sources(syncEntryCacheTest
...
@@ -137,6 +138,10 @@ target_sources(syncEntryCacheTest
PRIVATE
PRIVATE
"syncEntryCacheTest.cpp"
"syncEntryCacheTest.cpp"
)
)
target_sources
(
syncHashCacheTest
PRIVATE
"syncHashCacheTest.cpp"
)
target_sources
(
syncRequestVoteTest
target_sources
(
syncRequestVoteTest
PRIVATE
PRIVATE
"syncRequestVoteTest.cpp"
"syncRequestVoteTest.cpp"
...
@@ -387,6 +392,11 @@ target_include_directories(syncEntryCacheTest
...
@@ -387,6 +392,11 @@ target_include_directories(syncEntryCacheTest
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
)
target_include_directories
(
syncHashCacheTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncRequestVoteTest
target_include_directories
(
syncRequestVoteTest
PUBLIC
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
...
@@ -654,6 +664,10 @@ target_link_libraries(syncEntryCacheTest
...
@@ -654,6 +664,10 @@ target_link_libraries(syncEntryCacheTest
sync
sync
gtest_main
gtest_main
)
)
target_link_libraries
(
syncHashCacheTest
sync
gtest_main
)
target_link_libraries
(
syncRequestVoteTest
target_link_libraries
(
syncRequestVoteTest
sync
sync
gtest_main
gtest_main
...
...
source/libs/sync/test/syncEntryCacheTest.cpp
浏览文件 @
511c39a0
...
@@ -43,222 +43,82 @@ SRaftEntryCache* createCache(int maxCount) {
...
@@ -43,222 +43,82 @@ SRaftEntryCache* createCache(int maxCount) {
SSyncNode
*
pSyncNode
=
createFakeNode
();
SSyncNode
*
pSyncNode
=
createFakeNode
();
ASSERT
(
pSyncNode
!=
NULL
);
ASSERT
(
pSyncNode
!=
NULL
);
SRaftEntryCache
*
pCache
=
raftCacheCreate
(
pSyncNode
,
maxCount
);
SRaftEntryCache
*
pCache
=
raft
Entry
CacheCreate
(
pSyncNode
,
maxCount
);
ASSERT
(
pCache
!=
NULL
);
ASSERT
(
pCache
!=
NULL
);
return
pCache
;
return
pCache
;
}
}
void
test1
()
{
void
test1
()
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SRaftEntryCache
*
pCache
=
createCache
(
5
);
SRaftEntryCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
code
=
raftEntryCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
sTrace
(
"put entry code:%d, pEntry:%p"
,
code
,
pEntry
);
syncEntryDestory
(
pEntry
);
}
}
raftCacheLog2
((
char
*
)
"==test1 write 5 entries=="
,
pCache
);
raft
Entry
CacheLog2
((
char
*
)
"==test1 write 5 entries=="
,
pCache
);
SyncIndex
index
;
raftEntryCacheClear
(
pCache
,
3
);
index
=
1
;
raftEntryCacheLog2
((
char
*
)
"==test1 evict 3 entries=="
,
pCache
);
code
=
raftCacheDelEntry
(
pCache
,
index
);
ASSERT
(
code
==
0
);
index
=
3
;
code
=
raftCacheDelEntry
(
pCache
,
index
);
ASSERT
(
code
==
0
);
raftCacheLog2
((
char
*
)
"==test1 delete 1,3=="
,
pCache
);
code
=
raftCacheClear
(
pCache
);
raftEntryCacheClear
(
pCache
,
-
1
);
ASSERT
(
code
==
0
);
raftEntryCacheLog2
((
char
*
)
"==test1 evict -1(all) entries=="
,
pCache
);
raftCacheLog2
((
char
*
)
"==clear all=="
,
pCache
);
}
}
void
test2
()
{
void
test2
()
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SRaftEntryCache
*
pCache
=
createCache
(
5
);
SRaftEntryCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
code
=
raftEntryCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
sTrace
(
"put entry code:%d, pEntry:%p"
,
code
,
pEntry
);
syncEntryDestory
(
pEntry
);
}
}
raft
CacheLog2
((
char
*
)
"==test2
write 5 entries=="
,
pCache
);
raft
EntryCacheLog2
((
char
*
)
"==test1
write 5 entries=="
,
pCache
);
SyncIndex
index
;
SyncIndex
index
=
2
;
index
=
1
;
SSyncRaftEntry
*
pEntry
=
NULL
;
SSyncRaftEntry
*
pEntry
;
code
=
raftCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pEntry
);
syncEntryLog2
((
char
*
)
"==test2 get entry 1=="
,
pEntry
);
index
=
2
;
code
=
raftEntryCacheGetEntryP
(
pCache
,
index
,
&
pEntry
)
;
code
=
raftCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
1
&&
index
==
pEntry
->
index
);
ASSERT
(
code
==
0
);
sTrace
(
"get entry:%p for %ld"
,
pEntry
,
index
);
syncEntryLog2
((
char
*
)
"==test2 get entry pointer 2=="
,
pEntry
);
syncEntryLog2
((
char
*
)
"==test2 get entry pointer 2=="
,
pEntry
);
code
=
raftEntryCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
1
&&
index
==
pEntry
->
index
);
sTrace
(
"get entry:%p for %ld"
,
pEntry
,
index
);
syncEntryLog2
((
char
*
)
"==test2 get entry 2=="
,
pEntry
);
syncEntryDestory
(
pEntry
);
// not found
// not found
index
=
8
;
index
=
8
;
code
=
raftCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
code
=
raftEntryCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
-
1
&&
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
);
ASSERT
(
code
==
0
);
sTrace
(
"get entry:%p for %ld"
,
pEntry
,
index
);
sTrace
(
"==test2 get entry 8 not found=="
);
sTrace
(
"==test2 get entry 8 not found=="
);
// not found
// not found
index
=
9
;
index
=
9
;
code
=
raftCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
code
=
raftEntryCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
-
1
&&
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
);
ASSERT
(
code
==
0
);
sTrace
(
"==test2 get entry pointer 9 not found=="
);
sTrace
(
"get entry:%p for %ld"
,
pEntry
,
index
);
sTrace
(
"==test2 get entry 9 not found=="
);
}
}
void
test3
()
{
void
test3
()
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SRaftEntryCache
*
pCache
=
createCache
(
5
);
SRaftEntryCache
*
pCache
=
createCache
(
20
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
for
(
int
i
=
0
;
i
<=
4
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
syncEntryDestory
(
pEntry
);
}
for
(
int
i
=
6
;
i
<
10
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test3 write 10 entries, max count is 5=="
,
pCache
);
}
void
test4
()
{
int32_t
code
=
0
;
SRaftEntryCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
code
=
raftEntryCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
sTrace
(
"put entry code:%d, pEntry:%p"
,
code
,
pEntry
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test4 write 5 entries=="
,
pCache
);
SyncIndex
index
;
index
=
3
;
SSyncRaftEntry
*
pEntry
;
code
=
raftCacheGetAndDel
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncEntryLog2
((
char
*
)
"==test4 get-and-del entry 3=="
,
pEntry
);
raftCacheLog2
((
char
*
)
"==test4 after get-and-del entry 3=="
,
pCache
);
}
static
char
*
keyFn
(
const
void
*
pData
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
pData
;
return
(
char
*
)(
&
(
pEntry
->
index
));
}
static
int
cmpFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
memcmp
(
p1
,
p2
,
sizeof
(
SyncIndex
));
}
void
printSkipList
(
SSkipList
*
pSkipList
)
{
ASSERT
(
pSkipList
!=
NULL
);
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pSkipList
);
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryPrint2
((
char
*
)
""
,
pEntry
);
}
}
void
delSkipListFirst
(
SSkipList
*
pSkipList
,
int
n
)
{
ASSERT
(
pSkipList
!=
NULL
);
sTrace
(
"delete first %d -------------"
,
n
);
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pSkipList
);
for
(
int
i
=
0
;
i
<
n
;
++
i
)
{
tSkipListIterNext
(
pIter
);
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
tSkipListRemoveNode
(
pSkipList
,
pNode
);
}
}
}
SSyncRaftEntry
*
getLogEntry2
(
SSkipList
*
pSkipList
,
SyncIndex
index
)
{
SyncIndex
index2
=
index
;
SSyncRaftEntry
*
pEntry
=
NULL
;
int
arraySize
=
0
;
SArray
*
entryPArray
=
tSkipListGet
(
pSkipList
,
(
char
*
)(
&
index2
));
arraySize
=
taosArrayGetSize
(
entryPArray
);
if
(
arraySize
>
0
)
{
SSkipListNode
**
ppNode
=
(
SSkipListNode
**
)
taosArrayGet
(
entryPArray
,
0
);
ASSERT
(
*
ppNode
!=
NULL
);
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
*
ppNode
);
}
taosArrayDestroy
(
entryPArray
);
sTrace
(
"get index2: %ld, arraySize:%d -------------"
,
index
,
arraySize
);
syncEntryLog2
((
char
*
)
"getLogEntry2"
,
pEntry
);
return
pEntry
;
}
SSyncRaftEntry
*
getLogEntry
(
SSkipList
*
pSkipList
,
SyncIndex
index
)
{
sTrace
(
"get index: %ld -------------"
,
index
);
SyncIndex
index2
=
index
;
SSyncRaftEntry
*
pEntry
=
NULL
;
SSkipListIterator
*
pIter
=
tSkipListCreateIterFromVal
(
pSkipList
,
(
const
char
*
)
&
index2
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
if
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
}
syncEntryLog2
((
char
*
)
"getLogEntry"
,
pEntry
);
return
pEntry
;
}
void
test5
()
{
SSkipList
*
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
ASSERT
(
pSkipList
!=
NULL
);
sTrace
(
"insert 9 - 5"
);
for
(
int
i
=
9
;
i
>=
5
;
--
i
)
{
for
(
int
i
=
9
;
i
>=
5
;
--
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
code
=
raftEntryCachePutEntry
(
pCache
,
pEntry
);
sTrace
(
"put entry code:%d, pEntry:%p"
,
code
,
pEntry
);
}
}
raftEntryCacheLog2
((
char
*
)
"==test3 write 10 entries=="
,
pCache
);
sTrace
(
"insert 0 - 4"
);
for
(
int
i
=
0
;
i
<=
4
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
}
sTrace
(
"insert 7 7 7 7 7"
);
for
(
int
i
=
0
;
i
<=
4
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
7
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
}
sTrace
(
"print: -------------"
);
printSkipList
(
pSkipList
);
delSkipListFirst
(
pSkipList
,
3
);
sTrace
(
"print: -------------"
);
printSkipList
(
pSkipList
);
getLogEntry
(
pSkipList
,
2
);
getLogEntry
(
pSkipList
,
5
);
getLogEntry
(
pSkipList
,
7
);
getLogEntry
(
pSkipList
,
7
);
getLogEntry2
(
pSkipList
,
2
);
getLogEntry2
(
pSkipList
,
5
);
getLogEntry2
(
pSkipList
,
7
);
getLogEntry2
(
pSkipList
,
7
);
tSkipListDestroy
(
pSkipList
);
}
}
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
...
@@ -266,14 +126,9 @@ int main(int argc, char** argv) {
...
@@ -266,14 +126,9 @@ int main(int argc, char** argv) {
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
+
DEBUG_DEBUG
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
+
DEBUG_DEBUG
;
/*
test1
();
test1();
test2
();
test2();
test3
();
test3();
test4();
*/
test5
();
return
0
;
return
0
;
}
}
source/libs/sync/test/syncHashCacheTest.cpp
0 → 100644
浏览文件 @
511c39a0
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tskiplist.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
SSyncRaftEntry
*
createEntry
(
int
i
)
{
int32_t
dataLen
=
20
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
dataLen
);
assert
(
pEntry
!=
NULL
);
pEntry
->
msgType
=
88
;
pEntry
->
originalRpcType
=
99
;
pEntry
->
seqNum
=
3
;
pEntry
->
isWeak
=
true
;
pEntry
->
term
=
100
+
i
;
pEntry
->
index
=
i
;
snprintf
(
pEntry
->
data
,
dataLen
,
"value%d"
,
i
);
return
pEntry
;
}
SSyncNode
*
createFakeNode
()
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
ASSERT
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
return
pSyncNode
;
}
SRaftEntryHashCache
*
createCache
(
int
maxCount
)
{
SSyncNode
*
pSyncNode
=
createFakeNode
();
ASSERT
(
pSyncNode
!=
NULL
);
SRaftEntryHashCache
*
pCache
=
raftCacheCreate
(
pSyncNode
,
maxCount
);
ASSERT
(
pCache
!=
NULL
);
return
pCache
;
}
void
test1
()
{
int32_t
code
=
0
;
SRaftEntryHashCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test1 write 5 entries=="
,
pCache
);
SyncIndex
index
;
index
=
1
;
code
=
raftCacheDelEntry
(
pCache
,
index
);
ASSERT
(
code
==
0
);
index
=
3
;
code
=
raftCacheDelEntry
(
pCache
,
index
);
ASSERT
(
code
==
0
);
raftCacheLog2
((
char
*
)
"==test1 delete 1,3=="
,
pCache
);
code
=
raftCacheClear
(
pCache
);
ASSERT
(
code
==
0
);
raftCacheLog2
((
char
*
)
"==clear all=="
,
pCache
);
}
void
test2
()
{
int32_t
code
=
0
;
SRaftEntryHashCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test2 write 5 entries=="
,
pCache
);
SyncIndex
index
;
index
=
1
;
SSyncRaftEntry
*
pEntry
;
code
=
raftCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pEntry
);
syncEntryLog2
((
char
*
)
"==test2 get entry 1=="
,
pEntry
);
index
=
2
;
code
=
raftCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncEntryLog2
((
char
*
)
"==test2 get entry pointer 2=="
,
pEntry
);
// not found
index
=
8
;
code
=
raftCacheGetEntry
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
-
1
&&
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
);
sTrace
(
"==test2 get entry 8 not found=="
);
// not found
index
=
9
;
code
=
raftCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
-
1
&&
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
);
sTrace
(
"==test2 get entry pointer 9 not found=="
);
}
void
test3
()
{
int32_t
code
=
0
;
SRaftEntryHashCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
syncEntryDestory
(
pEntry
);
}
for
(
int
i
=
6
;
i
<
10
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test3 write 10 entries, max count is 5=="
,
pCache
);
}
void
test4
()
{
int32_t
code
=
0
;
SRaftEntryHashCache
*
pCache
=
createCache
(
5
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
code
=
raftCachePutEntry
(
pCache
,
pEntry
);
ASSERT
(
code
==
1
);
syncEntryDestory
(
pEntry
);
}
raftCacheLog2
((
char
*
)
"==test4 write 5 entries=="
,
pCache
);
SyncIndex
index
;
index
=
3
;
SSyncRaftEntry
*
pEntry
;
code
=
raftCacheGetAndDel
(
pCache
,
index
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncEntryLog2
((
char
*
)
"==test4 get-and-del entry 3=="
,
pEntry
);
raftCacheLog2
((
char
*
)
"==test4 after get-and-del entry 3=="
,
pCache
);
}
static
char
*
keyFn
(
const
void
*
pData
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
pData
;
return
(
char
*
)(
&
(
pEntry
->
index
));
}
static
int
cmpFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
memcmp
(
p1
,
p2
,
sizeof
(
SyncIndex
));
}
void
printSkipList
(
SSkipList
*
pSkipList
)
{
ASSERT
(
pSkipList
!=
NULL
);
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pSkipList
);
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryPrint2
((
char
*
)
""
,
pEntry
);
}
}
void
delSkipListFirst
(
SSkipList
*
pSkipList
,
int
n
)
{
ASSERT
(
pSkipList
!=
NULL
);
sTrace
(
"delete first %d -------------"
,
n
);
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pSkipList
);
for
(
int
i
=
0
;
i
<
n
;
++
i
)
{
tSkipListIterNext
(
pIter
);
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
tSkipListRemoveNode
(
pSkipList
,
pNode
);
}
}
SSyncRaftEntry
*
getLogEntry2
(
SSkipList
*
pSkipList
,
SyncIndex
index
)
{
SyncIndex
index2
=
index
;
SSyncRaftEntry
*
pEntry
=
NULL
;
int
arraySize
=
0
;
SArray
*
entryPArray
=
tSkipListGet
(
pSkipList
,
(
char
*
)(
&
index2
));
arraySize
=
taosArrayGetSize
(
entryPArray
);
if
(
arraySize
>
0
)
{
SSkipListNode
**
ppNode
=
(
SSkipListNode
**
)
taosArrayGet
(
entryPArray
,
0
);
ASSERT
(
*
ppNode
!=
NULL
);
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
*
ppNode
);
}
taosArrayDestroy
(
entryPArray
);
sTrace
(
"get index2: %ld, arraySize:%d -------------"
,
index
,
arraySize
);
syncEntryLog2
((
char
*
)
"getLogEntry2"
,
pEntry
);
return
pEntry
;
}
SSyncRaftEntry
*
getLogEntry
(
SSkipList
*
pSkipList
,
SyncIndex
index
)
{
sTrace
(
"get index: %ld -------------"
,
index
);
SyncIndex
index2
=
index
;
SSyncRaftEntry
*
pEntry
=
NULL
;
SSkipListIterator
*
pIter
=
tSkipListCreateIterFromVal
(
pSkipList
,
(
const
char
*
)
&
index2
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
if
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
}
syncEntryLog2
((
char
*
)
"getLogEntry"
,
pEntry
);
return
pEntry
;
}
void
test5
()
{
SSkipList
*
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
ASSERT
(
pSkipList
!=
NULL
);
sTrace
(
"insert 9 - 5"
);
for
(
int
i
=
9
;
i
>=
5
;
--
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
}
sTrace
(
"insert 0 - 4"
);
for
(
int
i
=
0
;
i
<=
4
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
i
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
}
sTrace
(
"insert 7 7 7 7 7"
);
for
(
int
i
=
0
;
i
<=
4
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
createEntry
(
7
);
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pSkipList
,
pEntry
);
}
sTrace
(
"print: -------------"
);
printSkipList
(
pSkipList
);
delSkipListFirst
(
pSkipList
,
3
);
sTrace
(
"print: -------------"
);
printSkipList
(
pSkipList
);
getLogEntry
(
pSkipList
,
2
);
getLogEntry
(
pSkipList
,
5
);
getLogEntry
(
pSkipList
,
7
);
getLogEntry
(
pSkipList
,
7
);
getLogEntry2
(
pSkipList
,
2
);
getLogEntry2
(
pSkipList
,
5
);
getLogEntry2
(
pSkipList
,
7
);
getLogEntry2
(
pSkipList
,
7
);
tSkipListDestroy
(
pSkipList
);
}
int
main
(
int
argc
,
char
**
argv
)
{
gRaftDetailLog
=
true
;
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
+
DEBUG_DEBUG
;
/*
test1();
test2();
test3();
test4();
*/
test5
();
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录