Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6a9c0330
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6a9c0330
编写于
6月 02, 2023
作者:
H
Haojun Liao
提交者:
GitHub
6月 02, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21553 from taosdata/enh/changeRocksParam
change log level
上级
5cdd344f
420ae737
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
62 addition
and
52 deletion
+62
-52
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+5
-3
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+25
-25
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-1
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+31
-23
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
6a9c0330
...
...
@@ -206,7 +206,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
void
*
streamQueueNextItem
(
SStreamQueue
*
queue
);
SStreamDataSubmit
*
streamDataSubmitNew
(
SPackedData
*
pData
,
int32_t
type
);
void
streamDataSubmitDestroy
(
SStreamDataSubmit
*
pDataSubmit
);
void
streamDataSubmitDestroy
(
SStreamDataSubmit
*
pDataSubmit
);
SStreamDataSubmit
*
streamSubmitBlockClone
(
SStreamDataSubmit
*
pSubmit
);
...
...
@@ -284,7 +284,7 @@ struct SStreamTask {
int16_t
dispatchMsgType
;
SStreamStatus
status
;
int32_t
selfChildId
;
int32_t
nodeId
;
// vgroup id
int32_t
nodeId
;
// vgroup id
SEpSet
epSet
;
SCheckpointInfo
chkInfo
;
STaskExec
exec
;
...
...
@@ -346,12 +346,14 @@ typedef struct SStreamMeta {
void
*
streamBackend
;
int32_t
streamBackendId
;
int64_t
streamBackendRid
;
SHashObj
*
pTaskBackendUnique
;
}
SStreamMeta
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
int32_t
tDecodeStreamEpInfo
(
SDecoder
*
pDecoder
,
SStreamChildEpInfo
*
pInfo
);
SStreamTask
*
tNewStreamTask
(
int64_t
streamId
,
int8_t
taskLevel
,
int8_t
fillHistory
,
int64_t
triggerParam
,
SArray
*
pTaskList
);
SStreamTask
*
tNewStreamTask
(
int64_t
streamId
,
int8_t
taskLevel
,
int8_t
fillHistory
,
int64_t
triggerParam
,
SArray
*
pTaskList
);
int32_t
tEncodeStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeStreamTask
(
SStreamTask
*
pTask
);
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
6a9c0330
...
...
@@ -128,7 +128,9 @@ void* streamBackendInit(const char* path) {
*/
streamStateOpenBackendCf
(
pHandle
,
(
char
*
)
path
,
cfs
,
nCf
);
}
rocksdb_list_column_families_destroy
(
cfs
,
nCf
);
if
(
cfs
!=
NULL
)
{
rocksdb_list_column_families_destroy
(
cfs
,
nCf
);
}
return
(
void
*
)
pHandle
;
_EXIT:
...
...
@@ -719,7 +721,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
qDebug
(
"succ to open rocksdb cf"
);
}
// close default cf
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
if
(((
rocksdb_column_family_handle_t
**
)
cfHandle
)[
0
]
!=
0
)
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
rocksdb_options_destroy
(
cfOpts
[
0
]);
handle
->
db
=
db
;
...
...
@@ -1026,10 +1028,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
q
Debug
("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
q
Error
("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
q
Debug
("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
q
Trace
("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
} \
taosMemoryFree(ttlV); \
} while (0);
...
...
@@ -1056,10 +1058,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (err == NULL) { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
q
Trace
("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
funcname); \
} else { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
q
Error
("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
taosMemoryFreeClear(err); \
} \
...
...
@@ -1068,11 +1070,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* p = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (len < 0) { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
q
Error
("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
funcname); \
code = -1; \
} else { \
q
Debug
("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
q
Trace
("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
len); \
} \
taosMemoryFree(val); \
...
...
@@ -1107,7 +1109,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(err); \
code = -1; \
} else { \
q
Debug
("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \
q
Trace
("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} \
} while (0);
...
...
@@ -1134,31 +1136,29 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t
streamStateClear_rocksdb
(
SStreamState
*
pState
)
{
qDebug
(
"streamStateClear_rocksdb"
);
SStateKey
sKey
=
{.
key
=
{.
ts
=
0
,
.
groupId
=
0
},
.
opNum
=
pState
->
number
};
SStateKey
eKey
=
{.
key
=
{.
ts
=
INT64_MAX
,
.
groupId
=
UINT64_MAX
},
.
opNum
=
pState
->
number
};
char
sKeyStr
[
128
]
=
{
0
};
char
eKeyStr
[
128
]
=
{
0
};
SStateKey
sKey
=
{.
key
=
{.
ts
=
0
,
.
groupId
=
0
},
.
opNum
=
pState
->
number
};
SStateKey
eKey
=
{.
key
=
{.
ts
=
INT64_MAX
,
.
groupId
=
UINT64_MAX
},
.
opNum
=
pState
->
number
};
int
sLen
=
stateKeyEncode
(
&
sKey
,
sKeyStr
);
int
eLen
=
stateKeyEncode
(
&
eKey
,
eKeyStr
);
char
toStringStart
[
128
]
=
{
0
};
char
toStringEnd
[
128
]
=
{
0
};
if
(
qDebugFlag
&
DEBUG_TRACE
)
{
stateKeyToString
(
&
sKey
,
toStringStart
);
stateKeyToString
(
&
eKey
,
toStringEnd
);
}
char
*
err
=
NULL
;
if
(
pState
->
pTdbState
->
pHandle
[
1
]
!=
NULL
)
{
char
*
err
=
NULL
;
rocksdb_delete_range_cf
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
writeOpts
,
pState
->
pTdbState
->
pHandle
[
1
],
sKeyStr
,
sLen
,
eKeyStr
,
eLen
,
&
err
);
}
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if
(
err
!=
NULL
)
{
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
taosMemoryFree
(
err
);
if
(
err
!=
NULL
)
{
char
toStringStart
[
128
]
=
{
0
};
char
toStringEnd
[
128
]
=
{
0
};
stateKeyToString
(
&
sKey
,
toStringStart
);
stateKeyToString
(
&
eKey
,
toStringEnd
);
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
taosMemoryFree
(
err
);
}
else
{
rocksdb_compact_range_cf
(
pState
->
pTdbState
->
rocksdb
,
pState
->
pTdbState
->
pHandle
[
1
],
sKeyStr
,
sLen
,
eKeyStr
,
eLen
);
}
}
return
0
;
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
6a9c0330
...
...
@@ -157,7 +157,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
tEncodeStreamRetrieveReq
(
&
encoder
,
&
req
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
msgType
=
TDMT_STREAM_RETRIEVE
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
};
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
msgType
=
TDMT_STREAM_RETRIEVE
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
};
if
(
tmsgSendReq
(
&
pEpInfo
->
epSet
,
&
rpcMsg
)
<
0
)
{
ASSERT
(
0
);
goto
CLEAR
;
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
6a9c0330
...
...
@@ -205,24 +205,25 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
// add to the ready tasks hash map, not the restored tasks hash map
int32_t
streamMetaAddDeployedTask
(
SStreamMeta
*
pMeta
,
int64_t
ver
,
SStreamTask
*
pTask
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
ver
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
if
(
streamMetaSaveTask
(
pMeta
,
pTask
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
if
(
streamMetaCommit
(
pMeta
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
void
*
p
=
taosHashGet
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
));
if
(
p
==
NULL
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
ver
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
if
(
streamMetaSaveTask
(
pMeta
,
pTask
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
if
(
streamMetaCommit
(
pMeta
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
taosArrayPush
(
pMeta
->
pTaskList
,
&
pTask
->
id
.
taskId
);
}
else
{
return
0
;
}
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
),
&
pTask
,
POINTER_BYTES
);
...
...
@@ -359,22 +360,29 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tDecodeStreamTask
(
&
decoder
,
pTask
);
tDecoderClear
(
&
decoder
);
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
pTask
->
chkInfo
.
version
)
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
return
-
1
;
}
// remove duplicate
void
*
p
=
taosHashGet
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
));
if
(
p
==
NULL
)
{
if
(
pMeta
->
expandFunc
(
pMeta
->
ahandle
,
pTask
,
pTask
->
chkInfo
.
version
)
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
taosMemoryFree
(
pTask
);
return
-
1
;
}
taosArrayPush
(
pMeta
->
pTaskList
,
&
pTask
->
id
.
taskId
);
}
else
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
taosMemoryFree
(
pTask
);
continue
;
}
if
(
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
pTask
->
id
.
taskId
),
&
pTask
,
sizeof
(
void
*
))
<
0
)
{
tdbFree
(
pKey
);
tdbFree
(
pVal
);
tdbTbcClose
(
pCur
);
taosMemoryFree
(
pTask
);
return
-
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录