Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0eeaab9c
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
“051549a85a10f0a76ae6864d4846dca4fb853a0c”上不存在“docs/zh/09-data-out/index.md”
提交
0eeaab9c
编写于
6月 19, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'refact/fillhistory1' into refact/fillhistory
上级
64365666
1e05f5c6
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
527 addition
and
403 deletion
+527
-403
cmake/rocksdb_CMakeLists.txt.in
cmake/rocksdb_CMakeLists.txt.in
+10
-7
include/libs/function/function.h
include/libs/function/function.h
+30
-22
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+38
-37
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+34
-28
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+2
-1
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+2
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+345
-270
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+16
-2
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+50
-35
未找到文件。
cmake/rocksdb_CMakeLists.txt.in
浏览文件 @
0eeaab9c
# rocksdb
# rocksdb
ExternalProject_Add(rocksdb
ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb.git
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
GIT_TAG v8.1.1
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
TEST_COMMAND ""
)
)
include/libs/function/function.h
浏览文件 @
0eeaab9c
...
@@ -129,22 +129,30 @@ typedef struct SSerializeDataHandle {
...
@@ -129,22 +129,30 @@ typedef struct SSerializeDataHandle {
}
SSerializeDataHandle
;
}
SSerializeDataHandle
;
// incremental state storage
// incremental state storage
typedef
struct
STdbState
{
typedef
struct
SBackendCfWrapper
{
void
*
rocksdb
;
void
*
rocksdb
;
void
**
pHandle
;
void
**
pHandle
;
void
*
writeOpts
;
void
*
writeOpts
;
void
*
readOpts
;
void
*
readOpts
;
void
**
cfOpts
;
void
**
cfOpts
;
void
*
dbOpt
;
void
*
dbOpt
;
struct
SStreamTask
*
pOwner
;
void
*
param
;
void
*
param
;
void
*
env
;
void
*
env
;
SListNode
*
pComparNode
;
SListNode
*
pComparNode
;
void
*
pBackend
;
void
*
pBackend
;
char
idstr
[
64
];
void
*
compactFactory
;
void
*
compactFactory
;
TdThreadRwlock
rwLock
;
TdThreadRwlock
rwLock
;
bool
remove
;
int64_t
backendId
;
char
idstr
[
64
];
}
SBackendCfWrapper
;
typedef
struct
STdbState
{
SBackendCfWrapper
*
pBackendCfWrapper
;
int64_t
backendCfWrapperId
;
char
idstr
[
64
];
struct
SStreamTask
*
pOwner
;
void
*
db
;
void
*
db
;
void
*
pStateDb
;
void
*
pStateDb
;
void
*
pFuncStateDb
;
void
*
pFuncStateDb
;
...
...
include/libs/stream/tstream.h
浏览文件 @
0eeaab9c
...
@@ -366,6 +366,7 @@ typedef struct SStreamMeta {
...
@@ -366,6 +366,7 @@ typedef struct SStreamMeta {
void
*
streamBackend
;
void
*
streamBackend
;
int64_t
streamBackendRid
;
int64_t
streamBackendRid
;
SHashObj
*
pTaskBackendUnique
;
SHashObj
*
pTaskBackendUnique
;
TdThreadMutex
backendMutex
;
}
SStreamMeta
;
}
SStreamMeta
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
...
@@ -587,10 +588,10 @@ int32_t streamSetStatusNormal(SStreamTask* pTask);
...
@@ -587,10 +588,10 @@ int32_t streamSetStatusNormal(SStreamTask* pTask);
const
char
*
streamGetTaskStatusStr
(
int32_t
status
);
const
char
*
streamGetTaskStatusStr
(
int32_t
status
);
// source level
// source level
int32_t
streamSetParamForStreamScanner
(
SStreamTask
*
pTask
,
SVersionRange
*
pVerRange
,
STimeWindow
*
pWindow
);
int32_t
streamSetParamForStreamScanner
(
SStreamTask
*
pTask
,
SVersionRange
*
pVerRange
,
STimeWindow
*
pWindow
);
int32_t
streamBuildSourceRecover1Req
(
SStreamTask
*
pTask
,
SStreamScanHistoryReq
*
pReq
,
int8_t
igUntreated
);
int32_t
streamBuildSourceRecover1Req
(
SStreamTask
*
pTask
,
SStreamScanHistoryReq
*
pReq
,
int8_t
igUntreated
);
int32_t
streamSourceScanHistoryData
(
SStreamTask
*
pTask
);
int32_t
streamSourceScanHistoryData
(
SStreamTask
*
pTask
);
//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
//
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t
streamDispatchScanHistoryFinishMsg
(
SStreamTask
*
pTask
);
int32_t
streamDispatchScanHistoryFinishMsg
(
SStreamTask
*
pTask
);
int32_t
streamDispatchTransferStateMsg
(
SStreamTask
*
pTask
);
int32_t
streamDispatchTransferStateMsg
(
SStreamTask
*
pTask
);
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
0eeaab9c
...
@@ -25,7 +25,8 @@
...
@@ -25,7 +25,8 @@
#define SINK_NODE_LEVEL (0)
#define SINK_NODE_LEVEL (0)
extern
bool
tsDeployOnSnode
;
extern
bool
tsDeployOnSnode
;
static
int32_t
mndAddSinkTaskToStream
(
SStreamObj
*
pStream
,
SArray
*
pTaskList
,
SMnode
*
pMnode
,
int32_t
vgId
,
SVgObj
*
pVgroup
,
int32_t
fillHistory
);
static
int32_t
mndAddSinkTaskToStream
(
SStreamObj
*
pStream
,
SArray
*
pTaskList
,
SMnode
*
pMnode
,
int32_t
vgId
,
SVgObj
*
pVgroup
,
int32_t
fillHistory
);
static
void
setFixedDownstreamEpInfo
(
SStreamTask
*
pDstTask
,
const
SStreamTask
*
pTask
);
static
void
setFixedDownstreamEpInfo
(
SStreamTask
*
pDstTask
,
const
SStreamTask
*
pTask
);
int32_t
mndConvertRsmaTask
(
char
**
pDst
,
int32_t
*
pDstLen
,
const
char
*
ast
,
int64_t
uid
,
int8_t
triggerType
,
int32_t
mndConvertRsmaTask
(
char
**
pDst
,
int32_t
*
pDstLen
,
const
char
*
ast
,
int64_t
uid
,
int8_t
triggerType
,
...
@@ -101,13 +102,13 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
...
@@ -101,13 +102,13 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
return
0
;
return
0
;
}
}
int32_t
mndAddDispatcherForInternalTask
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SArray
*
pSinkNodeList
,
SStreamTask
*
pTask
)
{
int32_t
mndAddDispatcherForInternalTask
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SArray
*
pSinkNodeList
,
SStreamTask
*
pTask
)
{
bool
isShuffle
=
false
;
bool
isShuffle
=
false
;
if
(
pStream
->
fixedSinkVgId
==
0
)
{
if
(
pStream
->
fixedSinkVgId
==
0
)
{
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pStream
->
targetDb
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pStream
->
targetDb
);
if
(
pDb
!=
NULL
&&
pDb
->
cfg
.
numOfVgroups
>
1
)
{
if
(
pDb
!=
NULL
&&
pDb
->
cfg
.
numOfVgroups
>
1
)
{
isShuffle
=
true
;
isShuffle
=
true
;
pTask
->
outputType
=
TASK_OUTPUT__SHUFFLE_DISPATCH
;
pTask
->
outputType
=
TASK_OUTPUT__SHUFFLE_DISPATCH
;
pTask
->
msgInfo
.
msgType
=
TDMT_STREAM_TASK_DISPATCH
;
pTask
->
msgInfo
.
msgType
=
TDMT_STREAM_TASK_DISPATCH
;
...
@@ -226,7 +227,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
...
@@ -226,7 +227,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
return
0
;
return
0
;
}
}
int32_t
mndAddSinkTaskToStream
(
SStreamObj
*
pStream
,
SArray
*
pTaskList
,
SMnode
*
pMnode
,
int32_t
vgId
,
SVgObj
*
pVgroup
,
int32_t
fillHistory
)
{
int32_t
mndAddSinkTaskToStream
(
SStreamObj
*
pStream
,
SArray
*
pTaskList
,
SMnode
*
pMnode
,
int32_t
vgId
,
SVgObj
*
pVgroup
,
int32_t
fillHistory
)
{
SStreamTask
*
pTask
=
tNewStreamTask
(
pStream
->
uid
,
TASK_LEVEL__SINK
,
fillHistory
,
0
,
pTaskList
);
SStreamTask
*
pTask
=
tNewStreamTask
(
pStream
->
uid
,
TASK_LEVEL__SINK
,
fillHistory
,
0
,
pTaskList
);
if
(
pTask
==
NULL
)
{
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -249,8 +251,8 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
...
@@ -249,8 +251,8 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
// todo set the correct ts, which should be last key of queried table.
// todo set the correct ts, which should be last key of queried table.
pTask
->
dataRange
.
window
.
skey
=
INT64_MIN
;
pTask
->
dataRange
.
window
.
skey
=
INT64_MIN
;
pTask
->
dataRange
.
window
.
ekey
=
1685959190000
;
//
taosGetTimestampMs();
pTask
->
dataRange
.
window
.
ekey
=
1685959190000
;
//
taosGetTimestampMs();
// pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs();
// pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs();
mDebug
(
"add source task 0x%x window:%"
PRId64
" - %"
PRId64
,
pTask
->
id
.
taskId
,
pTask
->
dataRange
.
window
.
skey
,
mDebug
(
"add source task 0x%x window:%"
PRId64
" - %"
PRId64
,
pTask
->
id
.
taskId
,
pTask
->
dataRange
.
window
.
skey
,
pTask
->
dataRange
.
window
.
ekey
);
pTask
->
dataRange
.
window
.
ekey
);
...
@@ -300,7 +302,7 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
...
@@ -300,7 +302,7 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
pDownstream
->
pUpstreamEpInfoList
==
NULL
)
{
if
(
pDownstream
->
pUpstreamEpInfoList
==
NULL
)
{
pDownstream
->
pUpstreamEpInfoList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pDownstream
->
pUpstreamEpInfoList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
}
}
...
@@ -316,7 +318,7 @@ static SArray* addNewTaskList(SArray* pTasksList) {
...
@@ -316,7 +318,7 @@ static SArray* addNewTaskList(SArray* pTasksList) {
// set the history task id
// set the history task id
static
void
setHTasksId
(
SArray
*
pTaskList
,
const
SArray
*
pHTaskList
)
{
static
void
setHTasksId
(
SArray
*
pTaskList
,
const
SArray
*
pHTaskList
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTaskList
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTaskList
);
++
i
)
{
SStreamTask
**
pStreamTask
=
taosArrayGet
(
pTaskList
,
i
);
SStreamTask
**
pStreamTask
=
taosArrayGet
(
pTaskList
,
i
);
SStreamTask
**
pHTask
=
taosArrayGet
(
pHTaskList
,
i
);
SStreamTask
**
pHTask
=
taosArrayGet
(
pHTaskList
,
i
);
...
@@ -369,8 +371,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
...
@@ -369,8 +371,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
// new stream task
SArray
**
pSinkTaskList
=
taosArrayGet
(
pStream
->
tasks
,
SINK_NODE_LEVEL
);
SArray
**
pSinkTaskList
=
taosArrayGet
(
pStream
->
tasks
,
SINK_NODE_LEVEL
);
int32_t
code
=
addSourceStreamTask
(
pMnode
,
pVgroup
,
pTaskList
,
*
pSinkTaskList
,
pStream
,
plan
,
pStream
->
uid
,
int32_t
code
=
addSourceStreamTask
(
pMnode
,
pVgroup
,
pTaskList
,
*
pSinkTaskList
,
pStream
,
plan
,
pStream
->
uid
,
0
,
0
,
hasExtraSink
,
lastTs
);
hasExtraSink
,
lastTs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
return
-
1
;
...
@@ -392,8 +394,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
...
@@ -392,8 +394,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
doAddSourceTask
(
SArray
*
pTaskList
,
int8_t
fillHistory
,
int64_t
uid
,
SStreamTask
*
pDownstreamTask
,
SMnode
*
pMnode
,
static
int32_t
doAddSourceTask
(
SArray
*
pTaskList
,
int8_t
fillHistory
,
int64_t
uid
,
SStreamTask
*
pDownstreamTask
,
SSubplan
*
pPlan
,
SVgObj
*
pVgroup
)
{
S
Mnode
*
pMnode
,
S
Subplan
*
pPlan
,
SVgObj
*
pVgroup
)
{
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__SOURCE
,
fillHistory
,
0
,
pTaskList
);
SStreamTask
*
pTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__SOURCE
,
fillHistory
,
0
,
pTaskList
);
if
(
pTask
==
NULL
)
{
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -402,7 +404,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
...
@@ -402,7 +404,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
// todo set the correct ts, which should be last key of queried table.
// todo set the correct ts, which should be last key of queried table.
pTask
->
dataRange
.
window
.
skey
=
INT64_MIN
;
pTask
->
dataRange
.
window
.
skey
=
INT64_MIN
;
pTask
->
dataRange
.
window
.
ekey
=
1685959190000
;
//
taosGetTimestampMs();
pTask
->
dataRange
.
window
.
ekey
=
1685959190000
;
//
taosGetTimestampMs();
mDebug
(
"s-task:0x%x level:%d set time window:%"
PRId64
" - %"
PRId64
,
pTask
->
id
.
taskId
,
pTask
->
info
.
taskLevel
,
mDebug
(
"s-task:0x%x level:%d set time window:%"
PRId64
" - %"
PRId64
,
pTask
->
id
.
taskId
,
pTask
->
info
.
taskLevel
,
pTask
->
dataRange
.
window
.
skey
,
pTask
->
dataRange
.
window
.
ekey
);
pTask
->
dataRange
.
window
.
skey
,
pTask
->
dataRange
.
window
.
ekey
);
...
@@ -416,8 +418,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
...
@@ -416,8 +418,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return
setEpToDownstreamTask
(
pTask
,
pDownstreamTask
);
return
setEpToDownstreamTask
(
pTask
,
pDownstreamTask
);
}
}
static
int32_t
doAddAggTask
(
uint64_t
uid
,
SArray
*
pTaskList
,
SArray
*
pSinkNodeList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
int32_t
fillHistory
,
static
int32_t
doAddAggTask
(
uint64_t
uid
,
SArray
*
pTaskList
,
SArray
*
pSinkNodeList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SStreamTask
**
pAggTask
)
{
int32_t
fillHistory
,
SStreamTask
**
pAggTask
)
{
*
pAggTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__AGG
,
fillHistory
,
pStream
->
conf
.
triggerParam
,
pTaskList
);
*
pAggTask
=
tNewStreamTask
(
uid
,
TASK_LEVEL__AGG
,
fillHistory
,
pStream
->
conf
.
triggerParam
,
pTaskList
);
if
(
*
pAggTask
==
NULL
)
{
if
(
*
pAggTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -475,7 +477,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
...
@@ -475,7 +477,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
SArray
*
pHSinkNodeList
=
taosArrayGetP
(
pStream
->
pHTasksList
,
SINK_NODE_LEVEL
);
SArray
*
pHSinkNodeList
=
taosArrayGetP
(
pStream
->
pHTasksList
,
SINK_NODE_LEVEL
);
*
pHAggTask
=
NULL
;
*
pHAggTask
=
NULL
;
code
=
doAddAggTask
(
pStream
->
hTaskUid
,
pHAggTaskList
,
pHSinkNodeList
,
pMnode
,
pStream
,
pStream
->
conf
.
fillHistory
,
pHAggTask
);
code
=
doAddAggTask
(
pStream
->
hTaskUid
,
pHAggTaskList
,
pHSinkNodeList
,
pMnode
,
pStream
,
pStream
->
conf
.
fillHistory
,
pHAggTask
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pSnode
!=
NULL
)
{
if
(
pSnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pSnode
);
sdbRelease
(
pSdb
,
pSnode
);
...
@@ -541,7 +544,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
...
@@ -541,7 +544,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
}
if
(
pStream
->
conf
.
fillHistory
)
{
if
(
pStream
->
conf
.
fillHistory
)
{
code
=
doAddSourceTask
(
pHSourceTaskList
,
pStream
->
conf
.
fillHistory
,
pStream
->
hTaskUid
,
pHDownstreamTask
,
pMnode
,
plan
,
pVgroup
);
code
=
doAddSourceTask
(
pHSourceTaskList
,
pStream
->
conf
.
fillHistory
,
pStream
->
hTaskUid
,
pHDownstreamTask
,
pMnode
,
plan
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -555,7 +559,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
...
@@ -555,7 +559,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
addSinkTasks
(
SArray
*
pTasksList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SArray
**
pCreatedTaskList
,
int32_t
fillHistory
)
{
static
int32_t
addSinkTasks
(
SArray
*
pTasksList
,
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SArray
**
pCreatedTaskList
,
int32_t
fillHistory
)
{
SArray
*
pSinkTaskList
=
addNewTaskList
(
pTasksList
);
SArray
*
pSinkTaskList
=
addNewTaskList
(
pTasksList
);
if
(
pStream
->
fixedSinkVgId
==
0
)
{
if
(
pStream
->
fixedSinkVgId
==
0
)
{
if
(
mndAddShuffleSinkTasksToStream
(
pMnode
,
pSinkTaskList
,
pStream
,
fillHistory
)
<
0
)
{
if
(
mndAddShuffleSinkTasksToStream
(
pMnode
,
pSinkTaskList
,
pStream
,
fillHistory
)
<
0
)
{
...
@@ -563,7 +568,8 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
...
@@ -563,7 +568,8 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return
-
1
;
return
-
1
;
}
}
}
else
{
}
else
{
if
(
mndAddSinkTaskToStream
(
pStream
,
pSinkTaskList
,
pMnode
,
pStream
->
fixedSinkVgId
,
&
pStream
->
fixedSinkVg
,
fillHistory
)
<
0
)
{
if
(
mndAddSinkTaskToStream
(
pStream
,
pSinkTaskList
,
pMnode
,
pStream
->
fixedSinkVgId
,
&
pStream
->
fixedSinkVg
,
fillHistory
)
<
0
)
{
// TODO free
// TODO free
return
-
1
;
return
-
1
;
}
}
...
@@ -658,8 +664,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
...
@@ -658,8 +664,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
return
-
1
;
}
}
}
else
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__TABLE
&&
pTopic
->
ast
!=
NULL
)
{
}
else
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__TABLE
&&
pTopic
->
ast
!=
NULL
)
{
SNode
*
pAst
=
NULL
;
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pTopic
->
ast
,
&
pAst
)
!=
0
)
{
if
(
nodesStringToNode
(
pTopic
->
ast
,
&
pAst
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pTopic
->
name
,
terrstr
());
mError
(
"topic:%s, failed to create since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
return
-
1
;
...
@@ -674,7 +680,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
...
@@ -674,7 +680,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
nodesDestroyNode
(
pAst
);
nodesDestroyNode
(
pAst
);
}
}
if
(
pPlan
)
{
if
(
pPlan
)
{
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
if
(
levelNum
!=
1
)
{
if
(
levelNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
qDestroyQueryPlan
(
pPlan
);
...
...
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
0eeaab9c
...
@@ -42,10 +42,11 @@ typedef struct {
...
@@ -42,10 +42,11 @@ typedef struct {
TdThreadMutex
cfMutex
;
TdThreadMutex
cfMutex
;
SHashObj
*
cfInst
;
SHashObj
*
cfInst
;
int64_t
defaultCfInit
;
int64_t
defaultCfInit
;
}
SBackend
Handle
;
}
SBackend
Wrapper
;
void
*
streamBackendInit
(
const
char
*
path
);
void
*
streamBackendInit
(
const
char
*
path
);
void
streamBackendCleanup
(
void
*
arg
);
void
streamBackendCleanup
(
void
*
arg
);
void
streamBackendHandleCleanup
(
void
*
arg
);
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
);
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
);
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
);
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
);
...
...
source/libs/stream/inc/streamInc.h
浏览文件 @
0eeaab9c
...
@@ -55,6 +55,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
...
@@ -55,6 +55,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SStreamQueueItem
*
streamMergeQueueItem
(
SStreamQueueItem
*
dst
,
SStreamQueueItem
*
pElem
);
SStreamQueueItem
*
streamMergeQueueItem
(
SStreamQueueItem
*
dst
,
SStreamQueueItem
*
pElem
);
extern
int32_t
streamBackendId
;
extern
int32_t
streamBackendId
;
extern
int32_t
streamBackendCfWrapperId
;
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
0eeaab9c
...
@@ -40,15 +40,7 @@ typedef struct {
...
@@ -40,15 +40,7 @@ typedef struct {
rocksdb_comparator_t
**
pCompares
;
rocksdb_comparator_t
**
pCompares
;
}
RocksdbCfInst
;
}
RocksdbCfInst
;
uint32_t
nextPow2
(
uint32_t
x
)
{
uint32_t
nextPow2
(
uint32_t
x
);
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
void
destroyRocksdbCfInst
(
RocksdbCfInst
*
inst
);
...
@@ -71,7 +63,22 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const
...
@@ -71,7 +63,22 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const
typedef
void
(
*
DestroyFunc
)(
void
*
state
);
typedef
void
(
*
DestroyFunc
)(
void
*
state
);
typedef
int32_t
(
*
EncodeValueFunc
)(
void
*
value
,
int32_t
vlen
,
int64_t
ttl
,
char
**
dest
);
typedef
int32_t
(
*
EncodeValueFunc
)(
void
*
value
,
int32_t
vlen
,
int64_t
ttl
,
char
**
dest
);
typedef
int32_t
(
*
DecodeValueFunc
)(
void
*
value
,
int32_t
vlen
,
int64_t
*
ttl
,
char
**
dest
);
typedef
int32_t
(
*
DecodeValueFunc
)(
void
*
value
,
int32_t
vlen
,
int64_t
*
ttl
,
char
**
dest
);
typedef
struct
{
const
char
*
key
;
int32_t
len
;
int
idx
;
BackendCmpFunc
cmpFunc
;
EncodeFunc
enFunc
;
DecodeFunc
deFunc
;
ToStringFunc
toStrFunc
;
CompareName
cmpName
;
DestroyFunc
detroyFunc
;
EncodeValueFunc
enValueFunc
;
DecodeValueFunc
deValueFunc
;
}
SCfInit
;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
const
char
*
compareDefaultName
(
void
*
name
);
const
char
*
compareDefaultName
(
void
*
name
);
const
char
*
compareStateName
(
void
*
name
);
const
char
*
compareStateName
(
void
*
name
);
const
char
*
compareWinKeyName
(
void
*
name
);
const
char
*
compareWinKeyName
(
void
*
name
);
...
@@ -80,11 +87,67 @@ const char* compareFuncKeyName(void* name);
...
@@ -80,11 +87,67 @@ const char* compareFuncKeyName(void* name);
const
char
*
compareParKeyName
(
void
*
name
);
const
char
*
compareParKeyName
(
void
*
name
);
const
char
*
comparePartagKeyName
(
void
*
name
);
const
char
*
comparePartagKeyName
(
void
*
name
);
int
defaultKeyComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
defaultKeyEncode
(
void
*
k
,
char
*
buf
);
int
defaultKeyDecode
(
void
*
k
,
char
*
buf
);
int
defaultKeyToString
(
void
*
k
,
char
*
buf
);
int
stateKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
stateKeyEncode
(
void
*
k
,
char
*
buf
);
int
stateKeyDecode
(
void
*
k
,
char
*
buf
);
int
stateKeyToString
(
void
*
k
,
char
*
buf
);
int
stateSessionKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
stateSessionKeyEncode
(
void
*
ses
,
char
*
buf
);
int
stateSessionKeyDecode
(
void
*
ses
,
char
*
buf
);
int
stateSessionKeyToString
(
void
*
k
,
char
*
buf
);
int
winKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
winKeyEncode
(
void
*
k
,
char
*
buf
);
int
winKeyDecode
(
void
*
k
,
char
*
buf
);
int
winKeyToString
(
void
*
k
,
char
*
buf
);
int
tupleKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
tupleKeyEncode
(
void
*
k
,
char
*
buf
);
int
tupleKeyDecode
(
void
*
k
,
char
*
buf
);
int
tupleKeyToString
(
void
*
k
,
char
*
buf
);
int
parKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
);
int
parKeyEncode
(
void
*
k
,
char
*
buf
);
int
parKeyDecode
(
void
*
k
,
char
*
buf
);
int
parKeyToString
(
void
*
k
,
char
*
buf
);
int
stremaValueEncode
(
void
*
k
,
char
*
buf
);
int
streamValueDecode
(
void
*
k
,
char
*
buf
);
int32_t
streamValueToString
(
void
*
k
,
char
*
buf
);
int32_t
streaValueIsStale
(
void
*
k
,
int64_t
ts
);
void
destroyFunc
(
void
*
arg
);
int32_t
encodeValueFunc
(
void
*
value
,
int32_t
vlen
,
int64_t
ttl
,
char
**
dest
);
int32_t
decodeValueFunc
(
void
*
value
,
int32_t
vlen
,
int64_t
*
ttl
,
char
**
dest
);
SCfInit
ginitDict
[]
=
{
{
"default"
,
7
,
0
,
defaultKeyComp
,
defaultKeyEncode
,
defaultKeyDecode
,
defaultKeyToString
,
compareDefaultName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"state"
,
5
,
1
,
stateKeyDBComp
,
stateKeyEncode
,
stateKeyDecode
,
stateKeyToString
,
compareStateName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"fill"
,
4
,
2
,
winKeyDBComp
,
winKeyEncode
,
winKeyDecode
,
winKeyToString
,
compareWinKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"sess"
,
4
,
3
,
stateSessionKeyDBComp
,
stateSessionKeyEncode
,
stateSessionKeyDecode
,
stateSessionKeyToString
,
compareSessionKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"func"
,
4
,
4
,
tupleKeyDBComp
,
tupleKeyEncode
,
tupleKeyDecode
,
tupleKeyToString
,
compareFuncKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"parname"
,
7
,
5
,
parKeyDBComp
,
parKeyEncode
,
parKeyDecode
,
parKeyToString
,
compareParKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"partag"
,
6
,
6
,
parKeyDBComp
,
parKeyEncode
,
parKeyDecode
,
parKeyToString
,
comparePartagKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
};
void
*
streamBackendInit
(
const
char
*
path
)
{
void
*
streamBackendInit
(
const
char
*
path
)
{
uint32_t
dbMemLimit
=
nextPow2
(
tsMaxStreamBackendCache
)
<<
20
;
uint32_t
dbMemLimit
=
nextPow2
(
tsMaxStreamBackendCache
)
<<
20
;
qDebug
(
"start to init stream backend at %s"
,
path
);
qDebug
(
"start to init stream backend at %s"
,
path
);
SBackend
Handle
*
pHandle
=
taosMemoryCalloc
(
1
,
sizeof
(
SBackendHandle
));
SBackend
Wrapper
*
pHandle
=
taosMemoryCalloc
(
1
,
sizeof
(
SBackendWrapper
));
pHandle
->
list
=
tdListNew
(
sizeof
(
SCfComparator
));
pHandle
->
list
=
tdListNew
(
sizeof
(
SCfComparator
));
taosThreadMutexInit
(
&
pHandle
->
mutex
,
NULL
);
taosThreadMutexInit
(
&
pHandle
->
mutex
,
NULL
);
taosThreadMutexInit
(
&
pHandle
->
cfMutex
,
NULL
);
taosThreadMutexInit
(
&
pHandle
->
cfMutex
,
NULL
);
...
@@ -154,7 +217,7 @@ _EXIT:
...
@@ -154,7 +217,7 @@ _EXIT:
return
NULL
;
return
NULL
;
}
}
void
streamBackendCleanup
(
void
*
arg
)
{
void
streamBackendCleanup
(
void
*
arg
)
{
SBackend
Handle
*
pHandle
=
(
SBackendHandle
*
)
arg
;
SBackend
Wrapper
*
pHandle
=
(
SBackendWrapper
*
)
arg
;
RocksdbCfInst
**
pIter
=
(
RocksdbCfInst
**
)
taosHashIterate
(
pHandle
->
cfInst
,
NULL
);
RocksdbCfInst
**
pIter
=
(
RocksdbCfInst
**
)
taosHashIterate
(
pHandle
->
cfInst
,
NULL
);
while
(
pIter
!=
NULL
)
{
while
(
pIter
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
pIter
;
RocksdbCfInst
*
inst
=
*
pIter
;
...
@@ -194,8 +257,70 @@ void streamBackendCleanup(void* arg) {
...
@@ -194,8 +257,70 @@ void streamBackendCleanup(void* arg) {
qDebug
(
"destroy stream backend backend:%p"
,
pHandle
);
qDebug
(
"destroy stream backend backend:%p"
,
pHandle
);
return
;
return
;
}
}
void
streamBackendHandleCleanup
(
void
*
arg
)
{
SBackendCfWrapper
*
wrapper
=
arg
;
bool
remove
=
wrapper
->
remove
;
qDebug
(
"start to do-close backendwrapper %p, %s"
,
wrapper
,
wrapper
->
idstr
);
if
(
wrapper
->
rocksdb
==
NULL
)
{
return
;
}
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
char
*
err
=
NULL
;
if
(
remove
)
{
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
wrapper
->
pHandle
[
i
]
!=
NULL
)
rocksdb_drop_column_family
(
wrapper
->
rocksdb
,
((
rocksdb_column_family_handle_t
**
)
wrapper
->
pHandle
)[
i
],
&
err
);
if
(
err
!=
NULL
)
{
// qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
taosMemoryFreeClear
(
err
);
}
}
}
else
{
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
wrapper
->
pHandle
[
i
]
!=
NULL
)
rocksdb_flush_cf
(
wrapper
->
rocksdb
,
flushOpt
,
wrapper
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
wrapper
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
rocksdb_flushoptions_destroy
(
flushOpt
);
}
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
wrapper
->
pHandle
[
i
]
!=
NULL
)
{
rocksdb_column_family_handle_destroy
(
wrapper
->
pHandle
[
i
]);
}
}
taosMemoryFreeClear
(
wrapper
->
pHandle
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_options_destroy
(
wrapper
->
cfOpts
[
i
]);
rocksdb_block_based_options_destroy
(((
RocksdbCfParam
*
)
wrapper
->
param
)[
i
].
tableOpt
);
}
if
(
remove
)
{
streamBackendDelCompare
(
wrapper
->
pBackend
,
wrapper
->
pComparNode
);
}
rocksdb_writeoptions_destroy
(
wrapper
->
writeOpts
);
wrapper
->
writeOpts
=
NULL
;
rocksdb_readoptions_destroy
(
wrapper
->
readOpts
);
wrapper
->
readOpts
=
NULL
;
taosMemoryFreeClear
(
wrapper
->
cfOpts
);
taosMemoryFreeClear
(
wrapper
->
param
);
taosThreadRwlockDestroy
(
&
wrapper
->
rwLock
);
wrapper
->
rocksdb
=
NULL
;
taosReleaseRef
(
streamBackendId
,
wrapper
->
backendId
);
qDebug
(
"end to do-close backendwrapper %p, %s"
,
wrapper
,
wrapper
->
idstr
);
taosMemoryFree
(
wrapper
);
return
;
}
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
)
{
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
)
{
SBackend
Handle
*
pHandle
=
(
SBackendHandle
*
)
backend
;
SBackend
Wrapper
*
pHandle
=
(
SBackendWrapper
*
)
backend
;
SListNode
*
node
=
NULL
;
SListNode
*
node
=
NULL
;
taosThreadMutexLock
(
&
pHandle
->
mutex
);
taosThreadMutexLock
(
&
pHandle
->
mutex
);
node
=
tdListAdd
(
pHandle
->
list
,
arg
);
node
=
tdListAdd
(
pHandle
->
list
,
arg
);
...
@@ -203,7 +328,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg) {
...
@@ -203,7 +328,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg) {
return
node
;
return
node
;
}
}
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
)
{
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
)
{
SBackend
Handle
*
pHandle
=
(
SBackendHandle
*
)
backend
;
SBackend
Wrapper
*
pHandle
=
(
SBackendWrapper
*
)
backend
;
SListNode
*
node
=
NULL
;
SListNode
*
node
=
NULL
;
taosThreadMutexLock
(
&
pHandle
->
mutex
);
taosThreadMutexLock
(
&
pHandle
->
mutex
);
node
=
tdListPopNode
(
pHandle
->
list
,
arg
);
node
=
tdListPopNode
(
pHandle
->
list
,
arg
);
...
@@ -542,23 +667,6 @@ void destroyFunc(void* arg) {
...
@@ -542,23 +667,6 @@ void destroyFunc(void* arg) {
return
;
return
;
}
}
typedef
struct
{
const
char
*
key
;
int32_t
len
;
int
idx
;
BackendCmpFunc
cmpFunc
;
EncodeFunc
enFunc
;
DecodeFunc
deFunc
;
ToStringFunc
toStrFunc
;
CompareName
cmpName
;
DestroyFunc
detroyFunc
;
EncodeValueFunc
enValueFunc
;
DecodeValueFunc
deValueFunc
;
}
SCfInit
;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t
encodeValueFunc
(
void
*
value
,
int32_t
vlen
,
int64_t
ttl
,
char
**
dest
)
{
int32_t
encodeValueFunc
(
void
*
value
,
int32_t
vlen
,
int64_t
ttl
,
char
**
dest
)
{
SStreamValue
key
=
{.
unixTimestamp
=
ttl
,
.
len
=
vlen
,
.
data
=
(
char
*
)(
value
)};
SStreamValue
key
=
{.
unixTimestamp
=
ttl
,
.
len
=
vlen
,
.
data
=
(
char
*
)(
value
)};
int32_t
len
=
0
;
int32_t
len
=
0
;
...
@@ -613,22 +721,6 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
...
@@ -613,22 +721,6 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
}
}
return
key
.
len
;
return
key
.
len
;
}
}
SCfInit
ginitDict
[]
=
{
{
"default"
,
7
,
0
,
defaultKeyComp
,
defaultKeyEncode
,
defaultKeyDecode
,
defaultKeyToString
,
compareDefaultName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"state"
,
5
,
1
,
stateKeyDBComp
,
stateKeyEncode
,
stateKeyDecode
,
stateKeyToString
,
compareStateName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"fill"
,
4
,
2
,
winKeyDBComp
,
winKeyEncode
,
winKeyDecode
,
winKeyToString
,
compareWinKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"sess"
,
4
,
3
,
stateSessionKeyDBComp
,
stateSessionKeyEncode
,
stateSessionKeyDecode
,
stateSessionKeyToString
,
compareSessionKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"func"
,
4
,
4
,
tupleKeyDBComp
,
tupleKeyEncode
,
tupleKeyDecode
,
tupleKeyToString
,
compareFuncKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"parname"
,
7
,
5
,
parKeyDBComp
,
parKeyEncode
,
parKeyDecode
,
parKeyToString
,
compareParKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
{
"partag"
,
6
,
6
,
parKeyDBComp
,
parKeyEncode
,
parKeyDecode
,
parKeyToString
,
comparePartagKeyName
,
destroyFunc
,
encodeValueFunc
,
decodeValueFunc
},
};
const
char
*
compareDefaultName
(
void
*
arg
)
{
const
char
*
compareDefaultName
(
void
*
arg
)
{
(
void
)
arg
;
(
void
)
arg
;
...
@@ -697,7 +789,7 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
...
@@ -697,7 +789,7 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
}
}
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
)
{
int32_t
streamStateOpenBackendCf
(
void
*
backend
,
char
*
name
,
char
**
cfs
,
int32_t
nCf
)
{
SBackend
Handle
*
handle
=
backend
;
SBackend
Wrapper
*
handle
=
backend
;
char
*
err
=
NULL
;
char
*
err
=
NULL
;
int64_t
streamId
;
int64_t
streamId
;
int32_t
taskId
,
dummy
=
0
;
int32_t
taskId
,
dummy
=
0
;
...
@@ -821,23 +913,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
...
@@ -821,23 +913,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
qInfo
(
"start to open state %p on backend %p 0x%"
PRIx64
"-%d"
,
pState
,
backend
,
pState
->
streamId
,
pState
->
taskId
);
qInfo
(
"start to open state %p on backend %p 0x%"
PRIx64
"-%d"
,
pState
,
backend
,
pState
->
streamId
,
pState
->
taskId
);
taosAcquireRef
(
streamBackendId
,
pState
->
streamBackendRid
);
taosAcquireRef
(
streamBackendId
,
pState
->
streamBackendRid
);
SBackendHandle
*
handle
=
backend
;
SBackendWrapper
*
handle
=
backend
;
SBackendCfWrapper
*
pBackendCfWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SBackendCfWrapper
));
sprintf
(
pState
->
pTdbState
->
idstr
,
"0x%"
PRIx64
"-%d"
,
pState
->
streamId
,
pState
->
taskId
);
taosThreadMutexLock
(
&
handle
->
cfMutex
);
taosThreadMutexLock
(
&
handle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
handle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
handle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
ppInst
;
RocksdbCfInst
*
inst
=
*
ppInst
;
p
State
->
pTdbState
->
rocksdb
=
inst
->
db
;
p
BackendCfWrapper
->
rocksdb
=
inst
->
db
;
p
State
->
pTdbState
->
pHandle
=
(
void
**
)
inst
->
pHandle
;
p
BackendCfWrapper
->
pHandle
=
(
void
**
)
inst
->
pHandle
;
p
State
->
pTdbState
->
writeOpts
=
inst
->
wOpt
;
p
BackendCfWrapper
->
writeOpts
=
inst
->
wOpt
;
p
State
->
pTdbState
->
readOpts
=
inst
->
rOpt
;
p
BackendCfWrapper
->
readOpts
=
inst
->
rOpt
;
p
State
->
pTdbState
->
cfOpts
=
(
void
**
)(
inst
->
cfOpt
);
p
BackendCfWrapper
->
cfOpts
=
(
void
**
)(
inst
->
cfOpt
);
p
State
->
pTdbState
->
dbOpt
=
handle
->
dbOpt
;
p
BackendCfWrapper
->
dbOpt
=
handle
->
dbOpt
;
p
State
->
pTdbState
->
param
=
inst
->
param
;
p
BackendCfWrapper
->
param
=
inst
->
param
;
p
State
->
pTdbState
->
pBackend
=
handle
;
p
BackendCfWrapper
->
pBackend
=
handle
;
p
State
->
pTdbState
->
pComparNode
=
inst
->
pCompareNode
;
p
BackendCfWrapper
->
pComparNode
=
inst
->
pCompareNode
;
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
pBackendCfWrapper
->
backendId
=
pState
->
streamBackendRid
;
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
pState
->
pTdbState
->
backendCfWrapperId
=
id
;
pState
->
pTdbState
->
pBackendCfWrapper
=
pBackendCfWrapper
;
qInfo
(
"succ to open state %p on backendWrapper, %p, %s"
,
pState
,
pBackendCfWrapper
,
pBackendCfWrapper
->
idstr
);
return
0
;
return
0
;
}
}
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
taosThreadMutexUnlock
(
&
handle
->
cfMutex
);
...
@@ -870,27 +969,33 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
...
@@ -870,27 +969,33 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pCompare
[
i
]
=
compare
;
pCompare
[
i
]
=
compare
;
}
}
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_column_family_handle_t
*
));
p
State
->
pTdbState
->
rocksdb
=
handle
->
db
;
p
BackendCfWrapper
->
rocksdb
=
handle
->
db
;
p
State
->
pTdbState
->
pHandle
=
(
void
**
)
cfHandle
;
p
BackendCfWrapper
->
pHandle
=
(
void
**
)
cfHandle
;
p
State
->
pTdbState
->
writeOpts
=
rocksdb_writeoptions_create
();
p
BackendCfWrapper
->
writeOpts
=
rocksdb_writeoptions_create
();
p
State
->
pTdbState
->
readOpts
=
rocksdb_readoptions_create
();
p
BackendCfWrapper
->
readOpts
=
rocksdb_readoptions_create
();
p
State
->
pTdbState
->
cfOpts
=
(
void
**
)
cfOpt
;
p
BackendCfWrapper
->
cfOpts
=
(
void
**
)
cfOpt
;
p
State
->
pTdbState
->
dbOpt
=
handle
->
dbOpt
;
p
BackendCfWrapper
->
dbOpt
=
handle
->
dbOpt
;
p
State
->
pTdbState
->
param
=
param
;
p
BackendCfWrapper
->
param
=
param
;
p
State
->
pTdbState
->
pBackend
=
handle
;
p
BackendCfWrapper
->
pBackend
=
handle
;
pBackendCfWrapper
->
backendId
=
pState
->
streamBackendRid
;
taosThreadRwlockInit
(
&
p
State
->
pTdbState
->
rwLock
,
NULL
);
taosThreadRwlockInit
(
&
p
BackendCfWrapper
->
rwLock
,
NULL
);
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pState
->
pTdbState
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
pBackendCfWrapper
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
pState
->
pTdbState
->
writeOpts
,
1
);
rocksdb_writeoptions_disable_WAL
(
pBackendCfWrapper
->
writeOpts
,
1
);
qInfo
(
"succ to open state %p on backend, %p, 0x%"
PRIx64
"-%d"
,
pState
,
handle
,
pState
->
streamId
,
pState
->
taskId
);
memcpy
(
pBackendCfWrapper
->
idstr
,
pState
->
pTdbState
->
idstr
,
sizeof
(
pState
->
pTdbState
->
idstr
));
int64_t
id
=
taosAddRef
(
streamBackendCfWrapperId
,
pBackendCfWrapper
);
pState
->
pTdbState
->
backendCfWrapperId
=
id
;
pState
->
pTdbState
->
pBackendCfWrapper
=
pBackendCfWrapper
;
qInfo
(
"succ to open state %p on backendWrapper %p %s"
,
pState
,
pBackendCfWrapper
,
pBackendCfWrapper
->
idstr
);
return
0
;
return
0
;
}
}
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
SBackendHandle
*
pHandle
=
pState
->
pTdbState
->
pBackend
;
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SBackendWrapper
*
pHandle
=
wrapper
->
pBackend
;
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
wrapper
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
RocksdbCfInst
*
inst
=
*
ppInst
;
RocksdbCfInst
*
inst
=
*
ppInst
;
taosMemoryFree
(
inst
);
taosMemoryFree
(
inst
);
...
@@ -899,63 +1004,10 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
...
@@ -899,63 +1004,10 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock
(
&
pHandle
->
cfMutex
);
taosThreadMutexUnlock
(
&
pHandle
->
cfMutex
);
char
*
status
[]
=
{
"close"
,
"drop"
};
char
*
status
[]
=
{
"close"
,
"drop"
};
qInfo
(
"start to close %s state %p on backend %p 0x%"
PRIx64
"-%d"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
pHandle
,
qInfo
(
"start to close %s state %p on backendWrapper %p %s"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
wrapper
,
pState
->
streamId
,
pState
->
taskId
);
wrapper
->
idstr
);
if
(
pState
->
pTdbState
->
rocksdb
==
NULL
)
{
wrapper
->
remove
|=
remove
;
// update by other pState
return
;
taosReleaseRef
(
streamBackendCfWrapperId
,
pState
->
pTdbState
->
backendCfWrapperId
);
}
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
char
*
err
=
NULL
;
if
(
remove
)
{
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
rocksdb_drop_column_family
(
pState
->
pTdbState
->
rocksdb
,
((
rocksdb_column_family_handle_t
**
)
pState
->
pTdbState
->
pHandle
)[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
}
else
{
rocksdb_flushoptions_t
*
flushOpt
=
rocksdb_flushoptions_create
();
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
rocksdb_flush_cf
(
pState
->
pTdbState
->
rocksdb
,
flushOpt
,
pState
->
pTdbState
->
pHandle
[
i
],
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
}
}
rocksdb_flushoptions_destroy
(
flushOpt
);
}
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
if
(
pState
->
pTdbState
->
pHandle
[
i
]
!=
NULL
)
{
rocksdb_column_family_handle_destroy
(
pState
->
pTdbState
->
pHandle
[
i
]);
}
}
taosMemoryFreeClear
(
pState
->
pTdbState
->
pHandle
);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_options_destroy
(
pState
->
pTdbState
->
cfOpts
[
i
]);
rocksdb_block_based_options_destroy
(((
RocksdbCfParam
*
)
pState
->
pTdbState
->
param
)[
i
].
tableOpt
);
}
if
(
remove
)
{
streamBackendDelCompare
(
pState
->
pTdbState
->
pBackend
,
pState
->
pTdbState
->
pComparNode
);
}
rocksdb_writeoptions_destroy
(
pState
->
pTdbState
->
writeOpts
);
pState
->
pTdbState
->
writeOpts
=
NULL
;
rocksdb_readoptions_destroy
(
pState
->
pTdbState
->
readOpts
);
pState
->
pTdbState
->
readOpts
=
NULL
;
taosMemoryFreeClear
(
pState
->
pTdbState
->
cfOpts
);
taosMemoryFreeClear
(
pState
->
pTdbState
->
param
);
taosThreadRwlockDestroy
(
&
pState
->
pTdbState
->
rwLock
);
pState
->
pTdbState
->
rocksdb
=
NULL
;
taosReleaseRef
(
streamBackendId
,
pState
->
streamBackendRid
);
}
}
void
streamStateDestroyCompar
(
void
*
arg
)
{
void
streamStateDestroyCompar
(
void
*
arg
)
{
SCfComparator
*
comp
=
(
SCfComparator
*
)
arg
;
SCfComparator
*
comp
=
(
SCfComparator
*
)
arg
;
...
@@ -974,27 +1026,27 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
...
@@ -974,27 +1026,27 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
break
;
break
;
}
}
}
}
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
if
(
pState
!=
NULL
&&
idx
!=
-
1
)
{
if
(
pState
!=
NULL
&&
idx
!=
-
1
)
{
rocksdb_column_family_handle_t
*
cf
=
NULL
;
rocksdb_column_family_handle_t
*
cf
=
NULL
;
taosThreadRwlockRdlock
(
&
pState
->
pTdbState
->
rwLock
);
taosThreadRwlockRdlock
(
&
wrapper
->
rwLock
);
cf
=
pState
->
pTdbState
->
pHandle
[
idx
];
cf
=
wrapper
->
pHandle
[
idx
];
taosThreadRwlockUnlock
(
&
pState
->
pTdbState
->
rwLock
);
taosThreadRwlockUnlock
(
&
wrapper
->
rwLock
);
if
(
cf
==
NULL
)
{
if
(
cf
==
NULL
)
{
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
GEN_COLUMN_FAMILY_NAME
(
buf
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
idx
].
key
);
GEN_COLUMN_FAMILY_NAME
(
buf
,
wrapper
->
idstr
,
ginitDict
[
idx
].
key
);
char
*
err
=
NULL
;
char
*
err
=
NULL
;
taosThreadRwlockWrlock
(
&
pState
->
pTdbState
->
rwLock
);
taosThreadRwlockWrlock
(
&
wrapper
->
rwLock
);
cf
=
rocksdb_create_column_family
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
cfOpts
[
idx
],
buf
,
&
err
);
cf
=
rocksdb_create_column_family
(
wrapper
->
rocksdb
,
wrapper
->
cfOpts
[
idx
],
buf
,
&
err
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
idx
=
-
1
;
idx
=
-
1
;
qError
(
"failed to to open cf, %p 0x%"
PRIx64
"-%d_%s, reason:%s"
,
pState
,
pState
->
streamId
,
pState
->
taskId
,
qError
(
"failed to to open cf, %p %s_%s, reason:%s"
,
pState
,
wrapper
->
idstr
,
funcName
,
err
);
funcName
,
err
);
taosMemoryFree
(
err
);
taosMemoryFree
(
err
);
}
else
{
}
else
{
pState
->
pTdbState
->
pHandle
[
idx
]
=
cf
;
wrapper
->
pHandle
[
idx
]
=
cf
;
}
}
taosThreadRwlockUnlock
(
&
pState
->
pTdbState
->
rwLock
);
taosThreadRwlockUnlock
(
&
wrapper
->
rwLock
);
}
}
}
}
...
@@ -1014,8 +1066,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1014,8 +1066,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t
**
readOpt
)
{
rocksdb_readoptions_t
**
readOpt
)
{
int
idx
=
streamStateGetCfIdx
(
pState
,
cfName
);
int
idx
=
streamStateGetCfIdx
(
pState
,
cfName
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
if
(
snapshot
!=
NULL
)
{
if
(
snapshot
!=
NULL
)
{
*
snapshot
=
(
rocksdb_snapshot_t
*
)
rocksdb_create_snapshot
(
pState
->
pTdbState
->
rocksdb
);
*
snapshot
=
(
rocksdb_snapshot_t
*
)
rocksdb_create_snapshot
(
wrapper
->
rocksdb
);
}
}
rocksdb_readoptions_t
*
rOpt
=
rocksdb_readoptions_create
();
rocksdb_readoptions_t
*
rOpt
=
rocksdb_readoptions_create
();
*
readOpt
=
rOpt
;
*
readOpt
=
rOpt
;
...
@@ -1023,8 +1076,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1023,8 +1076,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_set_snapshot
(
rOpt
,
*
snapshot
);
rocksdb_readoptions_set_snapshot
(
rOpt
,
*
snapshot
);
rocksdb_readoptions_set_fill_cache
(
rOpt
,
0
);
rocksdb_readoptions_set_fill_cache
(
rOpt
,
0
);
return
rocksdb_create_iterator_cf
(
pState
->
pTdbState
->
rocksdb
,
rOpt
,
return
rocksdb_create_iterator_cf
(
wrapper
->
rocksdb
,
rOpt
,
((
rocksdb_column_family_handle_t
**
)
wrapper
->
pHandle
)[
idx
]);
((
rocksdb_column_family_handle_t
**
)
pState
->
pTdbState
->
pHandle
)[
idx
]);
}
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
...
@@ -1038,13 +1090,13 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1038,13 +1090,13 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \
code = -1; \
break; \
break; \
} \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
char* ttlV = NULL; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
...
@@ -1069,22 +1121,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1069,22 +1121,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \
code = -1; \
break; \
break; \
} \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = wrapper->readOpts; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (val == NULL || len == 0) { \
if (err == NULL) { \
if (err == NULL) { \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
funcname); \
} else { \
} else { \
qError("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
err); \
taosMemoryFreeClear(err); \
taosMemoryFreeClear(err); \
} \
} \
code = -1; \
code = -1; \
...
@@ -1092,18 +1142,16 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1092,18 +1142,16 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* p = NULL; \
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
if (tlen <= 0) { \
qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString,
pState->pTdbState->idstr,
\
qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString,
wrapper->idstr,
\
funcname); \
funcname); \
code = -1; \
code = -1; \
} else { \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
tlen); \
} \
} \
taosMemoryFree(val); \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
if (vLen != NULL) *vLen = tlen; \
} \
} \
if (code == 0) \
if (code == 0) qDebug("streamState str: %s succ to read from %s_%s", toString, wrapper->idstr, funcname); \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
...
@@ -1117,21 +1165,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1117,21 +1165,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \
code = -1; \
break; \
break; \
} \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
err); \
taosMemoryFree(err); \
taosMemoryFree(err); \
code = -1; \
code = -1; \
} else { \
} else { \
qTrace("streamState str: %s succ to del from %s_%s", toString,
pState->pTdbState->idstr, funcname);
\
qTrace("streamState str: %s succ to del from %s_%s", toString,
wrapper->idstr, funcname);
\
} \
} \
} while (0);
} while (0);
...
@@ -1158,6 +1205,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
...
@@ -1158,6 +1205,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t
streamStateClear_rocksdb
(
SStreamState
*
pState
)
{
int32_t
streamStateClear_rocksdb
(
SStreamState
*
pState
)
{
qDebug
(
"streamStateClear_rocksdb"
);
qDebug
(
"streamStateClear_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
char
sKeyStr
[
128
]
=
{
0
};
char
sKeyStr
[
128
]
=
{
0
};
char
eKeyStr
[
128
]
=
{
0
};
char
eKeyStr
[
128
]
=
{
0
};
SStateKey
sKey
=
{.
key
=
{.
ts
=
0
,
.
groupId
=
0
},
.
opNum
=
pState
->
number
};
SStateKey
sKey
=
{.
key
=
{.
ts
=
0
,
.
groupId
=
0
},
.
opNum
=
pState
->
number
};
...
@@ -1166,10 +1214,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
...
@@ -1166,10 +1214,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
int
sLen
=
stateKeyEncode
(
&
sKey
,
sKeyStr
);
int
sLen
=
stateKeyEncode
(
&
sKey
,
sKeyStr
);
int
eLen
=
stateKeyEncode
(
&
eKey
,
eKeyStr
);
int
eLen
=
stateKeyEncode
(
&
eKey
,
eKeyStr
);
if
(
pState
->
pTdbState
->
pHandle
[
1
]
!=
NULL
)
{
if
(
wrapper
->
pHandle
[
1
]
!=
NULL
)
{
char
*
err
=
NULL
;
char
*
err
=
NULL
;
rocksdb_delete_range_cf
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
writeOpts
,
pState
->
pTdbState
->
pHandle
[
1
]
,
rocksdb_delete_range_cf
(
wrapper
->
rocksdb
,
wrapper
->
writeOpts
,
wrapper
->
pHandle
[
1
],
sKeyStr
,
sLen
,
eKeyStr
,
eLen
,
sKeyStr
,
sLen
,
eKeyStr
,
eLen
,
&
err
);
&
err
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
char
toStringStart
[
128
]
=
{
0
};
char
toStringStart
[
128
]
=
{
0
};
char
toStringEnd
[
128
]
=
{
0
};
char
toStringEnd
[
128
]
=
{
0
};
...
@@ -1179,7 +1227,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
...
@@ -1179,7 +1227,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
taosMemoryFree
(
err
);
taosMemoryFree
(
err
);
}
else
{
}
else
{
rocksdb_compact_range_cf
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
pHandle
[
1
],
sKeyStr
,
sLen
,
eKeyStr
,
eLen
);
rocksdb_compact_range_cf
(
wrapper
->
rocksdb
,
wrapper
->
pHandle
[
1
],
sKeyStr
,
sLen
,
eKeyStr
,
eLen
);
}
}
}
}
...
@@ -1273,8 +1321,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
...
@@ -1273,8 +1321,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1308,14 +1357,15 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
...
@@ -1308,14 +1357,15 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateGetCur_rocksdb"
);
qDebug
(
"streamStateGetCur_rocksdb"
);
int32_t
code
=
0
;
int32_t
code
=
0
;
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
const
SStateKey
maxStateKey
=
{.
key
=
{.
groupId
=
UINT64_MAX
,
.
ts
=
INT64_MAX
},
.
opNum
=
INT64_MAX
};
const
SStateKey
maxStateKey
=
{.
key
=
{.
groupId
=
UINT64_MAX
,
.
ts
=
INT64_MAX
},
.
opNum
=
INT64_MAX
};
STREAM_STATE_PUT_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
,
""
,
0
);
STREAM_STATE_PUT_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
,
""
,
0
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int32_t
klen
=
stateKeyEncode
((
void
*
)
&
maxStateKey
,
buf
);
int32_t
klen
=
stateKeyEncode
((
void
*
)
&
maxStateKey
,
buf
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
(
size_t
)
klen
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
(
size_t
)
klen
);
...
@@ -1335,10 +1385,11 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
...
@@ -1335,10 +1385,11 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur
*
streamStateGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateGetCur_rocksdb"
);
qDebug
(
"streamStateGetCur_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1426,12 +1477,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
...
@@ -1426,12 +1477,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
}
}
SStreamStateCur
*
streamStateSessionSeekKeyCurrentPrev_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
streamStateSessionSeekKeyCurrentPrev_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
qDebug
(
"streamStateSessionSeekKeyCurrentPrev_rocksdb"
);
qDebug
(
"streamStateSessionSeekKeyCurrentPrev_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1467,11 +1520,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
...
@@ -1467,11 +1520,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
}
SStreamStateCur
*
streamStateSessionSeekKeyCurrentNext_rocksdb
(
SStreamState
*
pState
,
SSessionKey
*
key
)
{
SStreamStateCur
*
streamStateSessionSeekKeyCurrentNext_rocksdb
(
SStreamState
*
pState
,
SSessionKey
*
key
)
{
qDebug
(
"streamStateSessionSeekKeyCurrentNext_rocksdb"
);
qDebug
(
"streamStateSessionSeekKeyCurrentNext_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
...
@@ -1504,11 +1558,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
...
@@ -1504,11 +1558,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur
*
streamStateSessionSeekKeyNext_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStreamStateCur
*
streamStateSessionSeekKeyNext_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
qDebug
(
"streamStateSessionSeekKeyNext_rocksdb"
);
qDebug
(
"streamStateSessionSeekKeyNext_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
...
@@ -1598,10 +1653,11 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
...
@@ -1598,10 +1653,11 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur
*
streamStateFillGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateFillGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateFillGetCur_rocksdb"
);
qDebug
(
"streamStateFillGetCur_rocksdb"
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1656,12 +1712,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
...
@@ -1656,12 +1712,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur
*
streamStateFillSeekKeyNext_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateFillSeekKeyNext_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateFillSeekKeyNext_rocksdb"
);
qDebug
(
"streamStateFillSeekKeyNext_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
!
pCur
)
{
if
(
!
pCur
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1692,12 +1749,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
...
@@ -1692,12 +1749,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
}
}
SStreamStateCur
*
streamStateFillSeekKeyPrev_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateFillSeekKeyPrev_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateFillSeekKeyPrev_rocksdb"
);
qDebug
(
"streamStateFillSeekKeyPrev_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"fill"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1728,12 +1786,13 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
...
@@ -1728,12 +1786,13 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
}
}
int32_t
streamStateSessionGetKeyByRange_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
,
SSessionKey
*
curKey
)
{
int32_t
streamStateSessionGetKeyByRange_rocksdb
(
SStreamState
*
pState
,
const
SSessionKey
*
key
,
SSessionKey
*
curKey
)
{
qDebug
(
"streamStateSessionGetKeyByRange_rocksdb"
);
qDebug
(
"streamStateSessionGetKeyByRange_rocksdb"
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
pCur
->
number
=
pState
->
number
;
pCur
->
number
=
pState
->
number
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"sess"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
...
@@ -1964,6 +2023,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
...
@@ -1964,6 +2023,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
int
code
=
0
;
int
code
=
0
;
char
*
err
=
NULL
;
char
*
err
=
NULL
;
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
rocksdb_snapshot_t
*
snapshot
=
NULL
;
rocksdb_snapshot_t
*
snapshot
=
NULL
;
rocksdb_readoptions_t
*
readopts
=
NULL
;
rocksdb_readoptions_t
*
readopts
=
NULL
;
rocksdb_iterator_t
*
pIter
=
streamStateIterCreate
(
pState
,
"default"
,
&
snapshot
,
&
readopts
);
rocksdb_iterator_t
*
pIter
=
streamStateIterCreate
(
pState
,
"default"
,
&
snapshot
,
&
readopts
);
...
@@ -1996,15 +2056,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
...
@@ -1996,15 +2056,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
}
}
rocksdb_iter_next
(
pIter
);
rocksdb_iter_next
(
pIter
);
}
}
rocksdb_release_snapshot
(
pState
->
pTdbState
->
rocksdb
,
snapshot
);
rocksdb_release_snapshot
(
wrapper
->
rocksdb
,
snapshot
);
rocksdb_readoptions_destroy
(
readopts
);
rocksdb_readoptions_destroy
(
readopts
);
rocksdb_iter_destroy
(
pIter
);
rocksdb_iter_destroy
(
pIter
);
return
code
;
return
code
;
}
}
void
*
streamDefaultIterCreate_rocksdb
(
SStreamState
*
pState
)
{
void
*
streamDefaultIterCreate_rocksdb
(
SStreamState
*
pState
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
wrapper
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"default"
,
(
rocksdb_snapshot_t
**
)
&
pCur
->
snapshot
,
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
(
rocksdb_readoptions_t
**
)
&
pCur
->
readOpt
);
return
pCur
;
return
pCur
;
...
@@ -2051,6 +2112,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
...
@@ -2051,6 +2112,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
,
int64_t
ttl
)
{
void
*
val
,
int32_t
vlen
,
int64_t
ttl
)
{
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
int
i
=
streamStateGetCfIdx
(
pState
,
cfName
);
int
i
=
streamStateGetCfIdx
(
pState
,
cfName
);
if
(
i
<
0
)
{
if
(
i
<
0
)
{
...
@@ -2062,7 +2124,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
...
@@ -2062,7 +2124,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
char
*
ttlV
=
NULL
;
char
*
ttlV
=
NULL
;
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
rocksdb_column_family_handle_t
*
pCf
=
pState
->
pTdbState
->
pHandle
[
ginitDict
[
i
].
idx
];
rocksdb_column_family_handle_t
*
pCf
=
wrapper
->
pHandle
[
ginitDict
[
i
].
idx
];
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
taosMemoryFree
(
ttlV
);
taosMemoryFree
(
ttlV
);
return
0
;
return
0
;
...
@@ -2074,7 +2136,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
...
@@ -2074,7 +2136,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char
*
ttlV
=
tmpBuf
;
char
*
ttlV
=
tmpBuf
;
int32_t
ttlVLen
=
ginitDict
[
cfIdx
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
int32_t
ttlVLen
=
ginitDict
[
cfIdx
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
rocksdb_column_family_handle_t
*
pCf
=
pState
->
pTdbState
->
pHandle
[
ginitDict
[
cfIdx
].
idx
];
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
rocksdb_column_family_handle_t
*
pCf
=
wrapper
->
pHandle
[
ginitDict
[
cfIdx
].
idx
];
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
if
(
tmpBuf
==
NULL
)
{
if
(
tmpBuf
==
NULL
)
{
...
@@ -2084,7 +2148,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
...
@@ -2084,7 +2148,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
}
}
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
)
{
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
)
{
char
*
err
=
NULL
;
char
*
err
=
NULL
;
rocksdb_write
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
writeOpts
,
(
rocksdb_writebatch_t
*
)
pBatch
,
&
err
);
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
rocksdb_write
(
wrapper
->
rocksdb
,
wrapper
->
writeOpts
,
(
rocksdb_writebatch_t
*
)
pBatch
,
&
err
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
qError
(
"streamState failed to write batch, err:%s"
,
err
);
qError
(
"streamState failed to write batch, err:%s"
,
err
);
taosMemoryFree
(
err
);
taosMemoryFree
(
err
);
...
@@ -2092,3 +2157,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
...
@@ -2092,3 +2157,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
}
}
return
0
;
return
0
;
}
}
uint32_t
nextPow2
(
uint32_t
x
)
{
x
=
x
-
1
;
x
=
x
|
(
x
>>
1
);
x
=
x
|
(
x
>>
2
);
x
=
x
|
(
x
>>
4
);
x
=
x
|
(
x
>>
8
);
x
=
x
|
(
x
>>
16
);
return
x
+
1
;
}
\ No newline at end of file
source/libs/stream/src/streamMeta.c
浏览文件 @
0eeaab9c
...
@@ -21,10 +21,18 @@
...
@@ -21,10 +21,18 @@
static
TdThreadOnce
streamMetaModuleInit
=
PTHREAD_ONCE_INIT
;
static
TdThreadOnce
streamMetaModuleInit
=
PTHREAD_ONCE_INIT
;
int32_t
streamBackendId
=
0
;
int32_t
streamBackendId
=
0
;
static
void
streamMetaEnvInit
()
{
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
}
int32_t
streamBackendCfWrapperId
=
0
;
static
void
streamMetaEnvInit
()
{
streamBackendId
=
taosOpenRef
(
64
,
streamBackendCleanup
);
streamBackendCfWrapperId
=
taosOpenRef
(
64
,
streamBackendHandleCleanup
);
}
void
streamMetaInit
()
{
taosThreadOnce
(
&
streamMetaModuleInit
,
streamMetaEnvInit
);
}
void
streamMetaInit
()
{
taosThreadOnce
(
&
streamMetaModuleInit
,
streamMetaEnvInit
);
}
void
streamMetaCleanup
()
{
taosCloseRef
(
streamBackendId
);
}
void
streamMetaCleanup
()
{
taosCloseRef
(
streamBackendId
);
taosCloseRef
(
streamBackendCfWrapperId
);
}
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
...
@@ -93,10 +101,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
...
@@ -93,10 +101,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto
_err
;
goto
_err
;
}
}
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
pMeta
->
pTaskBackendUnique
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
taosMemoryFree
(
streamPath
);
taosMemoryFree
(
streamPath
);
taosInitRWLatch
(
&
pMeta
->
lock
);
taosInitRWLatch
(
&
pMeta
->
lock
);
taosThreadMutexInit
(
&
pMeta
->
backendMutex
,
NULL
);
return
pMeta
;
return
pMeta
;
_err:
_err:
...
@@ -139,6 +151,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
...
@@ -139,6 +151,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosRemoveRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
taosRemoveRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
->
path
);
taosThreadMutexDestroy
(
&
pMeta
->
backendMutex
);
taosHashCleanup
(
pMeta
->
pTaskBackendUnique
);
taosMemoryFree
(
pMeta
);
taosMemoryFree
(
pMeta
);
}
}
...
...
source/libs/stream/src/streamState.c
浏览文件 @
0eeaab9c
...
@@ -116,16 +116,33 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
...
@@ -116,16 +116,33 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
pState
->
taskId
=
pStreamTask
->
id
.
taskId
;
pState
->
taskId
=
pStreamTask
->
id
.
taskId
;
pState
->
streamId
=
pStreamTask
->
id
.
streamId
;
pState
->
streamId
=
pStreamTask
->
id
.
streamId
;
sprintf
(
pState
->
pTdbState
->
idstr
,
"0x%"
PRIx64
"-%d"
,
pState
->
streamId
,
pState
->
taskId
);
#ifdef USE_ROCKSDB
#ifdef USE_ROCKSDB
SStreamMeta
*
pMeta
=
pStreamTask
->
pMeta
;
SStreamMeta
*
pMeta
=
pStreamTask
->
pMeta
;
pState
->
streamBackendRid
=
pMeta
->
streamBackendRid
;
pState
->
streamBackendRid
=
pMeta
->
streamBackendRid
;
// taosWLockLatch(&pMeta->lock);
taosThreadMutexLock
(
&
pMeta
->
backendMutex
);
void
*
uniqueId
=
taosHashGet
(
pMeta
->
pTaskBackendUnique
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
uniqueId
==
NULL
)
{
int
code
=
streamStateOpenBackend
(
pMeta
->
streamBackend
,
pState
);
int
code
=
streamStateOpenBackend
(
pMeta
->
streamBackend
,
pState
);
if
(
code
==
-
1
)
{
if
(
code
==
-
1
)
{
taosReleaseRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
taosReleaseRef
(
streamBackendId
,
pState
->
streamBackendRid
);
taosThreadMutexUnlock
(
&
pMeta
->
backendMutex
);
taosMemoryFree
(
pState
);
taosMemoryFree
(
pState
);
pState
=
NULL
;
return
NULL
;
}
}
taosHashPut
(
pMeta
->
pTaskBackendUnique
,
pState
->
pTdbState
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
,
&
pState
->
pTdbState
->
backendCfWrapperId
,
sizeof
(
pState
->
pTdbState
->
backendCfWrapperId
));
}
else
{
int64_t
id
=
*
(
int64_t
*
)
uniqueId
;
pState
->
pTdbState
->
backendCfWrapperId
=
id
;
pState
->
pTdbState
->
pBackendCfWrapper
=
taosAcquireRef
(
streamBackendCfWrapperId
,
id
);
taosAcquireRef
(
streamBackendId
,
pState
->
streamBackendRid
);
}
taosThreadMutexUnlock
(
&
pMeta
->
backendMutex
);
pState
->
pTdbState
->
pOwner
=
pTask
;
pState
->
pTdbState
->
pOwner
=
pTask
;
pState
->
pFileState
=
NULL
;
pState
->
pFileState
=
NULL
;
...
@@ -1113,9 +1130,7 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
...
@@ -1113,9 +1130,7 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#endif
#endif
}
}
void
streamStateReloadInfo
(
SStreamState
*
pState
,
TSKEY
ts
)
{
void
streamStateReloadInfo
(
SStreamState
*
pState
,
TSKEY
ts
)
{
streamFileStateReloadInfo
(
pState
->
pFileState
,
ts
);
}
streamFileStateReloadInfo
(
pState
->
pFileState
,
ts
);
}
#if 0
#if 0
char* streamStateSessionDump(SStreamState* pState) {
char* streamStateSessionDump(SStreamState* pState) {
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录