Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f7a7743a
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f7a7743a
编写于
5月 05, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix case failure
上级
a1b9dcf5
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
26 addition
and
6 deletion
+26
-6
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+1
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-0
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+5
-3
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+8
-2
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+10
-0
未找到文件。
include/libs/stream/streamState.h
浏览文件 @
f7a7743a
...
...
@@ -66,7 +66,7 @@ typedef struct {
SSHashObj
*
parNameMap
;
int64_t
checkPointId
;
int32_t
taskId
;
int
32
_t
streamId
;
int
64
_t
streamId
;
}
SStreamState
;
SStreamState
*
streamStateOpen
(
char
*
path
,
struct
SStreamTask
*
pTask
,
bool
specPath
,
int32_t
szPage
,
int32_t
pages
);
...
...
include/libs/stream/tstream.h
浏览文件 @
f7a7743a
...
...
@@ -347,6 +347,8 @@ typedef struct SStreamMeta {
SRWLatch
lock
;
int32_t
walScanCounter
;
void
*
streamBackend
;
int32_t
streamBackendId
;
int64_t
streamBackendRid
;
}
SStreamMeta
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
f7a7743a
...
...
@@ -617,10 +617,10 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
}
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
qInfo
(
"start to open backend, %p
, %d
-%d"
,
pState
,
pState
->
streamId
,
pState
->
taskId
);
qInfo
(
"start to open backend, %p
0x%"
PRIx64
"
-%d"
,
pState
,
pState
->
streamId
,
pState
->
taskId
);
SBackendHandle
*
handle
=
backend
;
sprintf
(
pState
->
pTdbState
->
idstr
,
"
%d
-%d"
,
pState
->
streamId
,
pState
->
taskId
);
sprintf
(
pState
->
pTdbState
->
idstr
,
"
0x%"
PRIx64
"
-%d"
,
pState
->
streamId
,
pState
->
taskId
);
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
ginitDict
)
/
sizeof
(
ginitDict
[
0
]);
...
...
@@ -671,12 +671,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator
compare
=
{.
comp
=
pCompare
,
.
numOfComp
=
cfLen
};
pState
->
pTdbState
->
pComparNode
=
streamBackendAddCompare
(
handle
,
&
compare
);
rocksdb_writeoptions_disable_WAL
(
pState
->
pTdbState
->
writeOpts
,
1
);
qInfo
(
"succ to open backend, %p, 0x%"
PRIx64
"-%d"
,
pState
,
pState
->
streamId
,
pState
->
taskId
);
return
0
;
}
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
char
*
status
[]
=
{
"close"
,
"drop"
};
qInfo
(
"start to %s backend, %p, %d-%d"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
pState
->
streamId
,
pState
->
taskId
);
qInfo
(
"start to %s backend, %p, 0x%"
PRIx64
"-%d"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
pState
->
streamId
,
pState
->
taskId
);
if
(
pState
->
pTdbState
->
rocksdb
==
NULL
)
{
return
;
}
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
f7a7743a
...
...
@@ -16,6 +16,7 @@
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h"
#include "tref.h"
#include "ttimer.h"
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
...
...
@@ -77,6 +78,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
pMeta
->
streamBackend
=
streamBackendInit
(
statePath
);
pMeta
->
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
pMeta
->
streamBackendRid
=
taosAddRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackend
);
taosMemoryFree
(
statePath
);
taosInitRWLatch
(
&
pMeta
->
lock
);
...
...
@@ -88,7 +92,7 @@ _err:
if
(
pMeta
->
pTaskDb
)
tdbTbClose
(
pMeta
->
pTaskDb
);
if
(
pMeta
->
pCheckpointDb
)
tdbTbClose
(
pMeta
->
pCheckpointDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
if
(
pMeta
->
streamBackend
)
streamBackendCleanup
(
pMeta
->
streamBackend
);
//
if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
taosMemoryFree
(
pMeta
);
return
NULL
;
}
...
...
@@ -116,7 +120,9 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup
(
pMeta
->
pTasks
);
streamBackendCleanup
(
pMeta
->
streamBackend
);
taosRemoveRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackendRid
);
// streamBackendCleanup(pMeta->streamBackend);
taosCloseRef
(
pMeta
->
streamBackendId
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
);
}
...
...
source/libs/stream/src/streamState.c
浏览文件 @
f7a7743a
...
...
@@ -22,6 +22,7 @@
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tref.h"
#include "ttimer.h"
#define MAX_TABLE_NAME_NUM 100000
...
...
@@ -92,6 +93,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
,
bool
specPath
,
int32_t
szPage
,
int32_t
pages
)
{
qWarn
(
"open stream state, %s"
,
path
);
if
(
pTask
==
NULL
)
{
qWarn
(
"failed to open stream state, %s"
,
path
);
return
NULL
;
}
SStreamState
*
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
if
(
pState
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -115,8 +120,10 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState
->
streamId
=
pTask
->
id
.
streamId
;
#ifdef USE_ROCKSDB
qWarn
(
"open stream state1"
);
taosAcquireRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
int
code
=
streamStateOpenBackend
(
pTask
->
pMeta
->
streamBackend
,
pState
);
if
(
code
==
-
1
)
{
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
taosMemoryFree
(
pState
);
pState
=
NULL
;
}
...
...
@@ -125,6 +132,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState
->
pFileState
=
NULL
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
);
pState
->
parNameMap
=
tSimpleHashInit
(
1024
,
hashFn
);
return
pState
;
#else
...
...
@@ -213,6 +221,7 @@ _err:
}
void
streamStateClose
(
SStreamState
*
pState
,
bool
remove
)
{
SStreamTask
*
pTask
=
pState
->
pTdbState
->
pOwner
;
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy
(
pState
,
remove
);
...
...
@@ -227,6 +236,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
#endif
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录