Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
TDengine
提交
8645f763
T
TDengine
项目概览
jobily
/
TDengine
9 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
8645f763
编写于
8月 31, 2023
作者:
H
Haojun Liao
提交者:
GitHub
8月 31, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22661 from taosdata/enh/reserve
stream change ver
上级
7bd54392
83a5e2be
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
93 addition
and
81 deletion
+93
-81
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+2
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+33
-31
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+4
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+16
-13
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+38
-36
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
8645f763
...
...
@@ -31,7 +31,7 @@ extern "C" {
typedef
struct
SStreamTask
SStreamTask
;
#define SSTREAM_TASK_VER
1
#define SSTREAM_TASK_VER
2
enum
{
STREAM_STATUS__NORMAL
=
0
,
STREAM_STATUS__STOP
,
...
...
@@ -371,6 +371,7 @@ struct SStreamTask {
int32_t
transferStateAlignCnt
;
struct
SStreamMeta
*
pMeta
;
SSHashObj
*
pNameMap
;
char
reserve
[
256
];
};
typedef
struct
SMetaHbInfo
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8645f763
...
...
@@ -615,25 +615,25 @@ void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
);
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
,
int8_t
sver
);
//typedef struct {
// int32_t epoch;
// SArray* consumers; // SArray<SMqConsumerEp*>
//} SMqSubActionLogEntry;
//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
//
typedef struct {
//
int32_t epoch;
//
SArray* consumers; // SArray<SMqConsumerEp*>
//
} SMqSubActionLogEntry;
//
SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
//
int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
//
void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
//
//typedef struct {
// char key[TSDB_SUBSCRIBE_KEY_LEN];
// SArray* logs; // SArray<SMqSubActionLogEntry*>
//} SMqSubActionLogObj;
//
typedef struct {
//
char key[TSDB_SUBSCRIBE_KEY_LEN];
//
SArray* logs; // SArray<SMqSubActionLogEntry*>
//
} SMqSubActionLogObj;
//
//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
//
SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
//
int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
//
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
typedef
struct
{
int32_t
oldConsumerNum
;
...
...
@@ -647,12 +647,12 @@ typedef struct {
}
SMqRebOutputVg
;
typedef
struct
{
SArray
*
rebVgs
;
// SArray<SMqRebOutputVg>
SArray
*
newConsumers
;
// SArray<int64_t>
SArray
*
removedConsumers
;
// SArray<int64_t>
SArray
*
modifyConsumers
;
// SArray<int64_t>
SMqSubscribeObj
*
pSub
;
// SMqSubActionLogEntry* pLogEntry;
SArray
*
rebVgs
;
// SArray<SMqRebOutputVg>
SArray
*
newConsumers
;
// SArray<int64_t>
SArray
*
removedConsumers
;
// SArray<int64_t>
SArray
*
modifyConsumers
;
// SArray<int64_t>
SMqSubscribeObj
*
pSub
;
// SMqSubActionLogEntry* pLogEntry;
}
SMqRebOutputObj
;
typedef
struct
SStreamConf
{
...
...
@@ -674,8 +674,8 @@ typedef struct {
int32_t
totalLevel
;
int64_t
smaId
;
// 0 for unused
// info
int64_t
uid
;
int8_t
status
;
int64_t
uid
;
int8_t
status
;
SStreamConf
conf
;
// source and target
int64_t
sourceDbUid
;
...
...
@@ -690,13 +690,13 @@ typedef struct {
int32_t
fixedSinkVgId
;
// 0 for shuffle
// transformation
char
*
sql
;
char
*
ast
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
char
*
sql
;
char
*
ast
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
SArray
*
pHTasksList
;
// generate the results for already stored ts data
int64_t
hTaskUid
;
// stream task for history ts data
SArray
*
pHTasksList
;
// generate the results for already stored ts data
int64_t
hTaskUid
;
// stream task for history ts data
SSchemaWrapper
outputSchema
;
SSchemaWrapper
tagSchema
;
...
...
@@ -709,6 +709,8 @@ typedef struct {
// 3.0.5.
int64_t
checkpointId
;
char
reserve
[
256
];
}
SStreamObj
;
int32_t
tEncodeSStreamObj
(
SEncoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
8645f763
...
...
@@ -84,6 +84,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.50 ver = 3
if
(
tEncodeI64
(
pEncoder
,
pObj
->
checkpointId
)
<
0
)
return
-
1
;
if
(
tEncodeCStrWithLen
(
pEncoder
,
pObj
->
reserve
,
sizeof
(
pObj
->
reserve
)
-
1
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
...
...
@@ -157,6 +159,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if
(
sver
>=
3
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
checkpointId
)
<
0
)
return
-
1
;
}
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
reserve
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
8645f763
...
...
@@ -14,6 +14,7 @@
*/
#include "mndStream.h"
#include "audit.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
...
...
@@ -28,9 +29,8 @@
#include "parser.h"
#include "tmisce.h"
#include "tname.h"
#include "audit.h"
#define MND_STREAM_VER_NUMBER
3
#define MND_STREAM_VER_NUMBER
4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
...
...
@@ -874,15 +874,18 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
char
detail
[
2000
]
=
{
0
};
sprintf
(
detail
,
"checkpointFreq:%"
PRId64
", createStb:%d, deleteMark:%"
PRId64
", "
sprintf
(
detail
,
"checkpointFreq:%"
PRId64
", createStb:%d, deleteMark:%"
PRId64
", "
"fillHistory:%d, igExists:%d, "
"igExpired:%d, igUpdate:%d, lastTs:%"
PRId64
", "
"maxDelay:%"
PRId64
", numOfTags:%d, sourceDB:%s, "
"igExpired:%d, igUpdate:%d, lastTs:%"
PRId64
", "
"maxDelay:%"
PRId64
", numOfTags:%d, sourceDB:%s, "
"targetStbFullName:%s, triggerType:%d, watermark:%"
PRId64
,
createStreamReq
.
checkpointFreq
,
createStreamReq
.
createStb
,
createStreamReq
.
deleteMark
,
createStreamReq
.
fillHistory
,
createStreamReq
.
igExists
,
createStreamReq
.
igExpired
,
createStreamReq
.
igUpdate
,
createStreamReq
.
lastTs
,
createStreamReq
.
maxDelay
,
createStreamReq
.
numOfTags
,
createStreamReq
.
sourceDB
,
createStreamReq
.
fillHistory
,
createStreamReq
.
igExists
,
createStreamReq
.
igExpired
,
createStreamReq
.
igUpdate
,
createStreamReq
.
lastTs
,
createStreamReq
.
maxDelay
,
createStreamReq
.
numOfTags
,
createStreamReq
.
sourceDB
,
createStreamReq
.
targetStbFullName
,
createStreamReq
.
triggerType
,
createStreamReq
.
watermark
);
auditRecord
(
pReq
,
pMnode
->
clusterId
,
"createStream"
,
createStreamReq
.
name
,
""
,
detail
);
...
...
@@ -2301,12 +2304,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream
(
pMnode
);
}
for
(
int32_t
i
=
0
;
i
<
req
.
numOfTasks
;
++
i
)
{
STaskStatusEntry
*
p
=
taosArrayGet
(
req
.
pTaskStatus
,
i
);
int64_t
k
[
2
]
=
{
p
->
streamId
,
p
->
taskId
};
int32_t
index
=
*
(
int32_t
*
)
taosHashGet
(
execNodeList
.
pTaskMap
,
&
k
,
sizeof
(
k
));
for
(
int32_t
i
=
0
;
i
<
req
.
numOfTasks
;
++
i
)
{
STaskStatusEntry
*
p
=
taosArrayGet
(
req
.
pTaskStatus
,
i
);
int64_t
k
[
2
]
=
{
p
->
streamId
,
p
->
taskId
};
int32_t
index
=
*
(
int32_t
*
)
taosHashGet
(
execNodeList
.
pTaskMap
,
&
k
,
sizeof
(
k
));
STaskStatusEntry
*
pStatusEntry
=
taosArrayGet
(
execNodeList
.
pTaskList
,
index
);
STaskStatusEntry
*
pStatusEntry
=
taosArrayGet
(
execNodeList
.
pTaskList
,
index
);
pStatusEntry
->
status
=
p
->
status
;
if
(
p
->
status
!=
TASK_STATUS__NORMAL
)
{
mDebug
(
"received s-task:0x%x not in ready status:%s"
,
p
->
taskId
,
streamGetTaskStatusStr
(
p
->
status
));
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
8645f763
...
...
@@ -134,46 +134,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
shuffleDispatcher
.
stbFullName
)
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
pEncoder
,
pTask
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeCStrWithLen
(
pEncoder
,
pTask
->
reserve
,
sizeof
(
pTask
->
reserve
)
-
1
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeStreamTaskChkInfo
(
SDecoder
*
pDecoder
,
SCheckpointInfo
*
pChkpInfo
)
{
int64_t
ver
;
int64_t
skip64
;
int8_t
skip8
;
int32_t
skip32
;
int16_t
skip16
;
SEpSet
epSet
;
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
ver
)
<
0
)
return
-
1
;
if
(
ver
!=
SSTREAM_TASK_VER
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
skip64
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
skip16
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pChkpInfo
->
checkpointId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pChkpInfo
->
checkpointVer
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tDecodeStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
ver
)
<
0
)
return
-
1
;
...
...
@@ -245,6 +211,42 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeCStrTo
(
pDecoder
,
pTask
->
shuffleDispatcher
.
stbFullName
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pTask
->
reserve
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tDecodeStreamTaskChkInfo
(
SDecoder
*
pDecoder
,
SCheckpointInfo
*
pChkpInfo
)
{
int64_t
ver
;
int64_t
skip64
;
int8_t
skip8
;
int32_t
skip32
;
int16_t
skip16
;
SEpSet
epSet
;
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
ver
)
<
0
)
return
-
1
;
if
(
ver
!=
SSTREAM_TASK_VER
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
skip64
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
skip16
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
skip8
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
skip32
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pChkpInfo
->
checkpointId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pChkpInfo
->
checkpointVer
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
...
...
@@ -483,7 +485,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
pTask
->
status
.
taskStatus
=
TASK_STATUS__STOP
;
qKillTask
(
pTask
->
exec
.
pExecutor
,
TSDB_CODE_SUCCESS
);
while
(
/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */
!
streamTaskIsIdle
(
pTask
))
{
while
(
/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */
!
streamTaskIsIdle
(
pTask
))
{
qDebug
(
"s-task:%s level:%d wait for task to be idle, check again in 100ms"
,
id
,
pTask
->
info
.
taskLevel
);
taosMsleep
(
100
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录