Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36a4413b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
36a4413b
编写于
4月 27, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
merge rocksdb inst
上级
255af841
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
219 addition
and
76 deletion
+219
-76
cmake/cmake.define
cmake/cmake.define
+1
-1
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+21
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+1
-0
include/util/tlist.h
include/util/tlist.h
+1
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+1
-1
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+9
-3
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+18
-4
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+93
-8
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+61
-52
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+4
-4
source/util/src/tlist.c
source/util/src/tlist.c
+9
-0
未找到文件。
cmake/cmake.define
浏览文件 @
36a4413b
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
...
...
include/libs/stream/streamState.h
浏览文件 @
36a4413b
...
...
@@ -27,6 +27,21 @@ extern "C" {
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
typedef
struct
{
rocksdb_t
*
db
;
rocksdb_writeoptions_t
*
writeOpts
;
rocksdb_readoptions_t
*
readOpts
;
rocksdb_options_t
*
dbOpt
;
void
*
param
;
void
*
env
;
rocksdb_cache_t
*
cache
;
TdThreadMutex
mutex
;
SList
*
list
;
}
SBackendHandle
;
void
*
streamBackendInit
(
const
char
*
path
);
void
streamBackendCleanup
(
void
*
arg
);
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
);
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
);
typedef
bool
(
*
state_key_cmpr_fn
)(
void
*
pKey1
,
void
*
pKey2
);
typedef
struct
STdbState
{
...
...
@@ -35,11 +50,12 @@ typedef struct STdbState {
rocksdb_writeoptions_t
*
writeOpts
;
rocksdb_readoptions_t
*
readOpts
;
rocksdb_options_t
**
cfOpts
;
rocksdb_comparator_t
**
pCompare
;
rocksdb_options_t
*
dbOpt
;
struct
SStreamTask
*
pOwner
;
void
*
param
;
void
*
env
;
SListNode
*
pComparNode
;
SBackendHandle
*
pBackendHandle
;
TDB
*
db
;
TTB
*
pStateDb
;
...
...
@@ -58,13 +74,15 @@ typedef struct {
int32_t
number
;
SSHashObj
*
parNameMap
;
int64_t
checkPointId
;
int32_t
taskId
;
int32_t
streamId
;
}
SStreamState
;
SStreamState
*
streamStateOpen
(
char
*
path
,
struct
SStreamTask
*
pTask
,
bool
specPath
,
int32_t
szPage
,
int32_t
pages
);
void
streamStateClose
(
SStreamState
*
pState
);
void
streamStateClose
(
SStreamState
*
pState
,
bool
remove
);
int32_t
streamStateBegin
(
SStreamState
*
pState
);
int32_t
streamStateCommit
(
SStreamState
*
pState
);
void
streamStateDestroy
(
SStreamState
*
pState
);
void
streamStateDestroy
(
SStreamState
*
pState
,
bool
remove
);
int32_t
streamStateDeleteCheckPoint
(
SStreamState
*
pState
,
TSKEY
mark
);
typedef
struct
{
...
...
include/libs/stream/tstream.h
浏览文件 @
36a4413b
...
...
@@ -347,6 +347,7 @@ typedef struct SStreamMeta {
int32_t
vgId
;
SRWLatch
lock
;
int32_t
walScan
;
void
*
streamBackend
;
}
SStreamMeta
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
...
...
include/util/tlist.h
浏览文件 @
36a4413b
...
...
@@ -228,6 +228,7 @@ void tdListPrependNode(SList *list, SListNode *node);
void
tdListAppendNode
(
SList
*
list
,
SListNode
*
node
);
int32_t
tdListPrepend
(
SList
*
list
,
void
*
data
);
int32_t
tdListAppend
(
SList
*
list
,
const
void
*
data
);
SListNode
*
tdListAdd
(
SList
*
list
,
const
void
*
data
);
SListNode
*
tdListPopHead
(
SList
*
list
);
SListNode
*
tdListPopTail
(
SList
*
list
);
SListNode
*
tdListGetHead
(
SList
*
list
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
36a4413b
...
...
@@ -90,7 +90,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
}
if
(
isDeepFree
&&
pItem
->
pStreamState
)
{
streamStateClose
(
pItem
->
pStreamState
);
streamStateClose
(
pItem
->
pStreamState
,
false
);
}
if
(
isDeepFree
&&
pInfo
->
taskInfo
[
i
])
{
...
...
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
36a4413b
...
...
@@ -24,8 +24,14 @@
#include "tcompare.h"
#include "ttimer.h"
int
streamInitBackend
(
SStreamState
*
pState
,
char
*
path
);
void
streamCleanBackend
(
SStreamState
*
pState
);
typedef
struct
SCfComparator
{
rocksdb_comparator_t
**
comp
;
int32_t
numOfComp
;
}
SCfComparator
;
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
);
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
);
void
streamStateDestroyCompar
(
void
*
arg
);
// void streamStateRemoveBackend(SStreamState* pState);
int32_t
streamStateFuncPut_rocksdb
(
SStreamState
*
pState
,
const
STupleKey
*
key
,
const
void
*
value
,
int32_t
vLen
);
int32_t
streamStateFuncGet_rocksdb
(
SStreamState
*
pState
,
const
STupleKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
...
...
@@ -73,7 +79,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t grou
int32_t
streamStateGetParTag_rocksdb
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
tagVal
,
int32_t
*
tagLen
);
int32_t
streamStatePutParName_rocksdb
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
tbname
[
TSDB_TABLE_NAME_LEN
]);
int32_t
streamStateGetParName_rocksdb
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
pVal
);
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
);
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
);
void
*
streamStateCreateBatch
();
int32_t
streamStateGetBatchSize
(
void
*
pBatch
);
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
36a4413b
...
...
@@ -65,6 +65,19 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
vgId
=
vgId
;
pMeta
->
ahandle
=
ahandle
;
pMeta
->
expandFunc
=
expandFunc
;
char
*
statePath
=
taosMemoryCalloc
(
1
,
len
);
sprintf
(
statePath
,
"%s/%s"
,
pMeta
->
path
,
"state"
);
code
=
taosMulModeMkDir
(
statePath
,
0755
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
taosMemoryFree
(
streamPath
);
goto
_err
;
}
pMeta
->
streamBackend
=
streamBackendInit
(
statePath
);
taosMemoryFree
(
statePath
);
taosInitRWLatch
(
&
pMeta
->
lock
);
return
pMeta
;
...
...
@@ -74,6 +87,7 @@ _err:
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
pCheckpointDb
)
tdbTbClose
(
pMeta
->
pCheckpointDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
if
(
pMeta
->
streamBackend
)
streamBackendCleanup
(
pMeta
->
streamBackend
);
taosMemoryFree
(
pMeta
);
return
NULL
;
}
...
...
@@ -101,6 +115,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup
(
pMeta
->
pTasks
);
streamBackendCleanup
(
pMeta
->
streamBackend
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
);
}
...
...
@@ -184,9 +199,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return
0
;
}
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
)
{
return
(
int32_t
)
taosHashGetSize
(
pMeta
->
pTasks
);
}
int32_t
streamMetaGetNumOfTasks
(
const
SStreamMeta
*
pMeta
)
{
return
(
int32_t
)
taosHashGetSize
(
pMeta
->
pTasks
);
}
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
taosRLockLatch
(
&
pMeta
->
lock
);
...
...
@@ -220,7 +233,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
pMeta
->
txn
);
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__STOP
);
//
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__DROPPING
);
taosWLockLatch
(
&
pMeta
->
lock
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
...
...
source/libs/stream/src/streamState.c
浏览文件 @
36a4413b
...
...
@@ -26,6 +26,88 @@
#define MAX_TABLE_NAME_NUM 100000
void
*
streamBackendInit
(
const
char
*
path
)
{
SBackendHandle
*
pHandle
=
calloc
(
1
,
sizeof
(
SBackendHandle
));
pHandle
->
list
=
tdListNew
(
sizeof
(
SCfComparator
));
taosThreadMutexInit
(
&
pHandle
->
mutex
,
NULL
);
rocksdb_env_t
*
env
=
rocksdb_create_default_env
();
// rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads
(
env
,
4
);
rocksdb_env_set_high_priority_background_threads
(
env
,
2
);
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
128
<<
20
);
rocksdb_options_t
*
opts
=
rocksdb_options_create
();
rocksdb_options_set_env
(
opts
,
env
);
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_write_buffer_size
(
opts
,
128
<<
20
);
rocksdb_options_set_max_total_wal_size
(
opts
,
128
<<
20
);
rocksdb_options_set_recycle_log_file_num
(
opts
,
6
);
rocksdb_options_set_max_write_buffer_number
(
opts
,
3
);
pHandle
->
env
=
env
;
pHandle
->
dbOpt
=
opts
;
pHandle
->
cache
=
cache
;
char
*
err
=
NULL
;
pHandle
->
db
=
rocksdb_open
(
opts
,
path
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to open rocksdb, path:%s, reason:%s"
,
path
,
err
);
taosMemoryFreeClear
(
err
);
goto
_EXIT
;
}
return
pHandle
;
_EXIT:
rocksdb_options_destroy
(
opts
);
rocksdb_cache_destroy
(
cache
);
rocksdb_env_destroy
(
env
);
taosThreadMutexDestroy
(
&
pHandle
->
mutex
);
tdListFree
(
pHandle
->
list
);
free
(
pHandle
);
return
NULL
;
}
void
streamBackendCleanup
(
void
*
arg
)
{
SBackendHandle
*
pHandle
=
(
SBackendHandle
*
)
arg
;
rocksdb_close
(
pHandle
->
db
);
rocksdb_options_destroy
(
pHandle
->
dbOpt
);
rocksdb_env_destroy
(
pHandle
->
env
);
rocksdb_cache_destroy
(
pHandle
->
cache
);
taosThreadMutexDestroy
(
&
pHandle
->
mutex
);
SListNode
*
head
=
tdListPopHead
(
pHandle
->
list
);
while
(
head
!=
NULL
)
{
streamStateDestroyCompar
(
head
->
data
);
taosMemoryFree
(
head
);
head
=
tdListPopHead
(
pHandle
->
list
);
}
tdListFree
(
pHandle
->
list
);
taosMemoryFree
(
pHandle
);
return
;
}
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
)
{
SBackendHandle
*
pHandle
=
(
SBackendHandle
*
)
backend
;
SListNode
*
node
=
NULL
;
taosThreadMutexLock
(
&
pHandle
->
mutex
);
node
=
tdListAdd
(
pHandle
->
list
,
arg
);
taosThreadMutexUnlock
(
&
pHandle
->
mutex
);
return
node
;
}
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
)
{
SBackendHandle
*
pHandle
=
(
SBackendHandle
*
)
backend
;
SListNode
*
node
=
NULL
;
taosThreadMutexLock
(
&
pHandle
->
mutex
);
node
=
tdListPopNode
(
pHandle
->
list
,
arg
);
taosThreadMutexUnlock
(
&
pHandle
->
mutex
);
if
(
node
)
{
streamStateDestroyCompar
(
node
->
data
);
taosMemoryFree
(
node
);
}
}
int
sessionRangeKeyCmpr
(
const
SSessionKey
*
pWin1
,
const
SSessionKey
*
pWin2
)
{
if
(
pWin1
->
groupId
>
pWin2
->
groupId
)
{
return
1
;
...
...
@@ -100,7 +182,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState
->
pTdbState
=
taosMemoryCalloc
(
1
,
sizeof
(
STdbState
));
if
(
pState
->
pTdbState
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
streamStateDestroy
(
pState
);
streamStateDestroy
(
pState
,
true
);
return
NULL
;
}
...
...
@@ -111,9 +193,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset
(
statePath
,
0
,
1024
);
tstrncpy
(
statePath
,
path
,
1024
);
}
pState
->
taskId
=
pTask
->
id
.
taskId
;
pState
->
streamId
=
pTask
->
id
.
streamId
;
#ifdef USE_ROCKSDB
qWarn
(
"open stream state1"
);
int
code
=
stream
InitBackend
(
pState
,
statePath
);
int
code
=
stream
StateOpenBackend
(
pTask
->
pMeta
->
streamBackend
,
pState
);
if
(
code
==
-
1
)
{
taosMemoryFree
(
pState
);
pState
=
NULL
;
...
...
@@ -205,14 +289,15 @@ _err:
tdbTbClose
(
pState
->
pTdbState
->
pParNameDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
streamStateDestroy
(
pState
);
streamStateDestroy
(
pState
,
false
);
return
NULL
;
#endif
}
void
streamStateClose
(
SStreamState
*
pState
)
{
void
streamStateClose
(
SStreamState
*
pState
,
bool
remove
)
{
#ifdef USE_ROCKSDB
streamCleanBackend
(
pState
);
// streamStateCloseBackend(pState);
streamStateDestroy
(
pState
,
remove
);
#else
tdbCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
tdbPostCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
...
...
@@ -224,7 +309,6 @@ void streamStateClose(SStreamState* pState) {
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
#endif
streamStateDestroy
(
pState
);
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
...
...
@@ -388,6 +472,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
#ifdef USE_ROCKSDB
int32_t
code
=
0
;
void
*
batch
=
streamStateCreateBatch
();
code
=
streamStatePutBatch
(
pState
,
"default"
,
batch
,
pKey
,
pVal
,
vLen
);
if
(
code
!=
0
)
{
return
code
;
...
...
@@ -1077,10 +1162,10 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
#endif
}
void
streamStateDestroy
(
SStreamState
*
pState
)
{
void
streamStateDestroy
(
SStreamState
*
pState
,
bool
remove
)
{
#ifdef USE_ROCKSDB
streamFileStateDestroy
(
pState
->
pFileState
);
streamStateDestroy_rocksdb
(
pState
);
streamStateDestroy_rocksdb
(
pState
,
remove
);
tSimpleHashCleanup
(
pState
->
parNameMap
);
// do nothong
#endif
...
...
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
36a4413b
...
...
@@ -324,7 +324,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) {
typedef
struct
{
void
*
tableOpt
;
void
*
lru
;
// global or not
}
rocksdbCfParam
;
const
char
*
cfName
[]
=
{
"default"
,
"state"
,
"fill"
,
"sess"
,
"func"
,
"parname"
,
"partag"
};
...
...
@@ -358,6 +357,9 @@ typedef struct {
}
SCfInit
;
#define GEN_COLUMN_FAMILY_NAME(name, streamId, taskId, SUBFIX) \
sprintf(name, "%d_%d_%s", (streamId), (taskId), (SUBFIX));
SCfInit
ginitDict
[]
=
{
{
"default"
,
7
,
0
,
defaultKeyComp
,
defaultKeyEncode
,
defaultKeyDecode
,
defaultKeyToString
,
compareDefaultName
,
destroyFunc
},
...
...
@@ -378,21 +380,9 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; }
const
char
*
compareParKeyName
(
void
*
name
)
{
return
ginitDict
[
5
].
key
;
}
const
char
*
comparePartagKeyName
(
void
*
name
)
{
return
ginitDict
[
6
].
key
;
}
int
streamInitBackend
(
SStreamState
*
pState
,
char
*
path
)
{
rocksdb_env_t
*
env
=
rocksdb_create_default_env
();
// rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads
(
env
,
4
);
rocksdb_env_set_high_priority_background_threads
(
env
,
2
);
rocksdb_options_t
*
opts
=
rocksdb_options_create
();
rocksdb_options_set_env
(
opts
,
env
);
// rocksdb_options_increase_parallelism(opts, 8);
// rocksdb_options_optimize_level_style_compaction(opts, 0);
// create the DB if it's not already present
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_options_set_write_buffer_size
(
opts
,
64
<<
20
);
rocksdb_options_set_recycle_log_file_num
(
opts
,
6
);
rocksdb_options_set_max_write_buffer_number
(
opts
,
3
);
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
qError
(
"start to open backend, %p, %d-%d"
,
pState
,
pState
->
streamId
,
pState
->
taskId
);
SBackendHandle
*
handle
=
backend
;
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
...
...
@@ -400,11 +390,10 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdbCfParam
*
param
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdbCfParam
));
const
rocksdb_options_t
**
cfOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
cfOpt
[
i
]
=
rocksdb_options_create
_copy
(
opts
);
cfOpt
[
i
]
=
rocksdb_options_create
(
);
// refactor later
rocksdb_block_based_table_options_t
*
tableOpt
=
rocksdb_block_based_options_create
();
rocksdb_cache_t
*
cache
=
rocksdb_cache_create_lru
(
64
<<
20
);
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
cache
);
rocksdb_block_based_options_set_block_cache
(
tableOpt
,
handle
->
cache
);
rocksdb_filterpolicy_t
*
filter
=
rocksdb_filterpolicy_create_bloom
(
15
);
rocksdb_block_based_options_set_filter_policy
(
tableOpt
,
filter
);
...
...
@@ -412,73 +401,93 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_set_block_based_table_factory
((
rocksdb_options_t
*
)
cfOpt
[
i
],
tableOpt
);
param
[
i
].
tableOpt
=
tableOpt
;
param
[
i
].
lru
=
cache
;
// rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8);
// rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans);
};
rocksdb_comparator_t
**
pCompare
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_comparator_t
**
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
SCfInit
*
cf
=
&
ginitDict
[
i
];
SCfInit
*
cf
=
&
ginitDict
[
i
];
rocksdb_comparator_t
*
compare
=
rocksdb_comparator_create
(
NULL
,
cf
->
detroyFunc
,
cf
->
cmpFunc
,
cf
->
cmpName
);
rocksdb_options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
i
],
compare
);
pCompare
[
i
]
=
compare
;
}
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryMalloc
(
cfLen
*
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_t
*
db
=
rocksdb_open_column_families
(
opts
,
path
,
cfLen
,
cfName
,
cfOpt
,
cfHandle
,
&
err
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
char
buf
[
64
]
=
{
0
};
GEN_COLUMN_FAMILY_NAME
(
buf
,
pState
->
streamId
,
pState
->
taskId
,
ginitDict
[
i
].
key
);
cfHandle
[
i
]
=
rocksdb_create_column_family
(
handle
->
db
,
cfOpt
[
i
],
buf
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"rocksdb create column family failed, reason:%s"
,
err
);
taosMemoryFree
(
err
);
return
-
1
;
}
}
pState
->
pTdbState
->
rocksdb
=
db
;
pState
->
pTdbState
->
rocksdb
=
handle
->
db
;
pState
->
pTdbState
->
pHandle
=
cfHandle
;
pState
->
pTdbState
->
writeOpts
=
rocksdb_writeoptions_create
();
// rocksdb_writeoptions_
// rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1);
pState
->
pTdbState
->
readOpts
=
rocksdb_readoptions_create
();
pState
->
pTdbState
->
cfOpts
=
(
rocksdb_options_t
**
)
cfOpt
;
pState
->
pTdbState
->
pCompare
=
pCompare
;
pState
->
pTdbState
->
dbOpt
=
opts
;
//
pState->pTdbState->pCompare = pCompare;
pState
->
pTdbState
->
dbOpt
=
handle
->
dbOpt
;
pState
->
pTdbState
->
param
=
param
;
pState
->
pTdbState
->
env
=
env
;
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pState
->
pTdbState
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
pState
->
pTdbState
->
writeOpts
,
1
);
qError
(
"end to open backend, %p"
,
pState
);
return
0
;
}
void
streamCleanBackend
(
SStreamState
*
pState
)
{
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
char
*
status
[]
=
{
"remove"
,
"drop"
};
qError
(
"start to %s backend, %p, %d-%d"
,
status
[
remove
==
false
?
1
:
0
],
pState
,
pState
->
streamId
,
pState
->
taskId
);
if
(
pState
->
pTdbState
->
rocksdb
==
NULL
)
{
qInfo
(
"rocksdb already free"
);
return
;
}
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
rocksdbCfParam
*
param
=
pState
->
pTdbState
->
param
;
char
*
err
=
NULL
;
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
remove
)
{
rocksdb_drop_column_family
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
pHandle
[
i
],
&
err
);
}
else
{
rocksdb_flush_cf
(
pState
->
pTdbState
->
rocksdb
,
flushOpt
,
pState
->
pTdbState
->
pHandle
[
i
],
&
err
);
}
}
rocksdb_flushoptions_destroy
(
flushOpt
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_column_family_handle_destroy
(
pState
->
pTdbState
->
pHandle
[
i
]);
}
taosMemoryFreeClear
(
pState
->
pTdbState
->
pHandle
);
rocksdb_options_destroy
(
pState
->
pTdbState
->
dbOpt
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_options_destroy
(
pState
->
pTdbState
->
cfOpts
[
i
]);
rocksdb_block_based_options_destroy
(
param
[
i
].
tableOpt
);
}
if
(
remove
)
{
streamBackendDelCompare
(
pState
->
pTdbState
->
pBackendHandle
,
pState
->
pTdbState
->
pComparNode
);
}
rocksdb_writeoptions_destroy
(
pState
->
pTdbState
->
writeOpts
);
pState
->
pTdbState
->
writeOpts
=
NULL
;
rocksdb_readoptions_destroy
(
pState
->
pTdbState
->
readOpts
);
pState
->
pTdbState
->
readOpts
=
NULL
;
rocksdb_close
(
pState
->
pTdbState
->
rocksdb
);
// wait for all background work to finish
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_options_destroy
(
pState
->
pTdbState
->
cfOpts
[
i
]);
rocksdb_comparator_destroy
(
pState
->
pTdbState
->
pCompare
[
i
]);
rocksdb_cache_destroy
(
param
[
i
].
lru
);
rocksdb_block_based_options_destroy
(
param
[
i
].
tableOpt
);
}
taosMemoryFreeClear
(
pState
->
pTdbState
->
cfOpts
);
taosMemoryFree
(
pState
->
pTdbState
->
pCompare
);
taosMemoryFree
(
pState
->
pTdbState
->
param
);
rocksdb_env_destroy
(
pState
->
pTdbState
->
env
);
taosMemoryFreeClear
(
pState
->
pTdbState
->
param
);
pState
->
pTdbState
->
rocksdb
=
NULL
;
}
void
streamStateDestroyCompar
(
void
*
arg
)
{
SCfComparator
*
comp
=
(
SCfComparator
*
)
arg
;
for
(
int
i
=
0
;
i
<
comp
->
numOfComp
;
i
++
)
{
rocksdb_comparator_destroy
(
comp
->
comp
[
i
]);
}
taosMemoryFree
(
comp
->
comp
);
}
int
streamGetInit
(
const
char
*
funcName
)
{
size_t
len
=
strlen
(
funcName
);
for
(
int
i
=
0
;
i
<
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
i
++
)
{
...
...
@@ -1540,7 +1549,7 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return
code
;
}
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
)
{
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
)
{
// only close db
stream
CleanBackend
(
pStat
e
);
stream
StateCloseBackend
(
pState
,
remov
e
);
}
\ No newline at end of file
source/libs/stream/src/streamTask.c
浏览文件 @
36a4413b
...
...
@@ -27,7 +27,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
pTask
->
id
.
streamId
=
streamId
;
char
buf
[
128
]
=
{
0
};
sprintf
(
buf
,
"0x%"
PRIx64
"-%d"
,
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
);
sprintf
(
buf
,
"0x%"
PRIx64
"-%d"
,
pTask
->
id
.
streamId
,
pTask
->
id
.
taskId
);
pTask
->
id
.
idStr
=
taosStrdup
(
buf
);
pTask
->
status
.
schedStatus
=
TASK_SCHED_STATUS__INACTIVE
;
...
...
@@ -171,7 +171,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
void
tFreeStreamTask
(
SStreamTask
*
pTask
)
{
qDebug
(
"free s-task:%s"
,
pTask
->
id
.
idStr
);
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
(
pTask
->
status
.
taskStatus
));
if
(
pTask
->
inputQueue
)
{
streamQueueClose
(
pTask
->
inputQueue
);
}
...
...
@@ -204,10 +204,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if
(
pTask
->
pState
)
{
streamStateClose
(
pTask
->
pState
);
streamStateClose
(
pTask
->
pState
,
status
==
TASK_STATUS__DROPPING
);
}
if
(
pTask
->
id
.
idStr
!=
NULL
)
{
if
(
pTask
->
id
.
idStr
!=
NULL
)
{
taosMemoryFree
((
void
*
)
pTask
->
id
.
idStr
);
}
...
...
source/util/src/tlist.c
浏览文件 @
36a4413b
...
...
@@ -87,6 +87,15 @@ int32_t tdListAppend(SList *list, const void *data) {
return
0
;
}
// return the node pointer
SListNode
*
tdListAdd
(
SList
*
list
,
const
void
*
data
)
{
SListNode
*
node
=
(
SListNode
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SListNode
)
+
list
->
eleSize
);
if
(
node
==
NULL
)
return
NULL
;
memcpy
((
void
*
)(
node
->
data
),
data
,
list
->
eleSize
);
TD_DLIST_APPEND
(
list
,
node
);
return
node
;
}
SListNode
*
tdListPopHead
(
SList
*
list
)
{
SListNode
*
node
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录