Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ea54ac9d
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
ea54ac9d
编写于
3月 24, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10972 from taosdata/feature/tq
Feature/tq
上级
ee3967cb
4cf88c35
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
167 addition
and
90 deletion
+167
-90
include/common/tmsg.h
include/common/tmsg.h
+41
-16
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+19
-15
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+22
-21
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+17
-21
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+27
-1
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+5
-1
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+3
-5
source/dnode/vnode/src/inc/tqInt.h
source/dnode/vnode/src/inc/tqInt.h
+6
-5
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+17
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
ea54ac9d
...
...
@@ -1167,7 +1167,7 @@ typedef struct {
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
output
TbName
[
TSDB_TABLE_
NAME_LEN
];
char
output
STbName
[
TSDB_TABLE_F
NAME_LEN
];
int8_t
igExists
;
char
*
sql
;
char
*
ast
;
...
...
@@ -1975,7 +1975,7 @@ typedef struct {
int32_t
tagsFilterLen
;
// strlen + 1
int32_t
sqlLen
;
// strlen + 1
int32_t
astLen
;
// strlen + 1
char
*
expr
;
char
*
expr
;
char
*
tagsFilter
;
char
*
sql
;
char
*
ast
;
...
...
@@ -1997,9 +1997,9 @@ typedef struct {
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
int8_t
timezoneInt
;
// sma data expired if timezone changes.
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
...
...
@@ -2371,6 +2371,26 @@ enum {
STREAM_NEXT_OP_DST__SND
,
};
enum
{
STREAM_SOURCE_TYPE__NONE
=
1
,
STREAM_SOURCE_TYPE__SUPER
,
STREAM_SOURCE_TYPE__CHILD
,
STREAM_SOURCE_TYPE__NORMAL
,
};
enum
{
STREAM_SINK_TYPE__NONE
=
1
,
STREAM_SINK_TYPE__INPLACE
,
STREAM_SINK_TYPE__ASSIGNED
,
STREAM_SINK_TYPE__MULTIPLE
,
STREAM_SINK_TYPE__TEMPORARY
,
};
enum
{
STREAM_TYPE__NORMAL
=
1
,
STREAM_TYPE__SMA
,
};
typedef
struct
{
void
*
inputHandle
;
void
*
executor
;
...
...
@@ -2381,28 +2401,33 @@ typedef struct {
int32_t
taskId
;
int32_t
level
;
int8_t
status
;
int8_t
pipeSource
;
int8_t
pipeSink
;
int8_t
numOfRunners
;
int8_t
parallelizable
;
int8_t
nextOpDst
;
// vnode or snode
// vnode or snode
int8_t
nextOpDst
;
int8_t
sourceType
;
int8_t
sinkType
;
// for sink type assigned
int32_t
sinkVgId
;
SEpSet
NextOpEp
;
char
*
qmsg
;
// not applied to encoder and decoder
// executor meta info
char
*
qmsg
;
// followings are not applied to encoder and decoder
int8_t
numOfRunners
;
SStreamRunner
runner
[
8
];
// void* executor;
// void* stateStore;
// storage handle
}
SStreamTask
;
static
FORCE_INLINE
SStreamTask
*
streamTaskNew
(
int64_t
streamId
,
int32_t
level
)
{
static
FORCE_INLINE
SStreamTask
*
streamTaskNew
(
int64_t
streamId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
calloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
level
=
level
;
pTask
->
status
=
STREAM_TASK_STATUS__RUNNING
;
pTask
->
qmsg
=
NULL
;
return
pTask
;
...
...
@@ -2435,7 +2460,7 @@ typedef struct {
int64_t
streamId
;
int64_t
version
;
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamS
maS
inkReq
;
}
SStreamSinkReq
;
#pragma pack(pop)
...
...
include/dnode/mnode/mnode.h
浏览文件 @
ea54ac9d
...
...
@@ -17,7 +17,9 @@
#define _TD_MND_H_
#include "monitor.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "trpc.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/client/src/tmq.c
浏览文件 @
ea54ac9d
...
...
@@ -505,7 +505,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
.
sql
=
(
char
*
)
sql
,
};
tNameExtractFullName
(
&
name
,
req
.
name
);
strcpy
(
req
.
outputTbName
,
tbName
);
strcpy
(
req
.
output
S
TbName
,
tbName
);
int
tlen
=
tSerializeSCMCreateStreamReq
(
NULL
,
0
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
...
...
source/common/src/tmsg.c
浏览文件 @
ea54ac9d
...
...
@@ -314,7 +314,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for
(
col_id_t
i
=
0
;
i
<
pReq
->
stbCfg
.
nBSmaCols
;
++
i
)
{
tlen
+=
taosEncodeFixedI16
(
buf
,
pReq
->
stbCfg
.
pBSmaCols
[
i
]);
}
if
(
pReq
->
rollup
&&
pReq
->
stbCfg
.
pRSmaParam
)
{
if
(
pReq
->
rollup
&&
pReq
->
stbCfg
.
pRSmaParam
)
{
SRSmaParam
*
param
=
pReq
->
stbCfg
.
pRSmaParam
;
tlen
+=
taosEncodeFixedU32
(
buf
,
(
uint32_t
)
param
->
xFilesFactor
);
tlen
+=
taosEncodeFixedI8
(
buf
,
param
->
delayUnit
);
...
...
@@ -341,7 +341,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for
(
col_id_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nBSmaCols
;
++
i
)
{
tlen
+=
taosEncodeFixedI16
(
buf
,
pReq
->
ntbCfg
.
pBSmaCols
[
i
]);
}
if
(
pReq
->
rollup
&&
pReq
->
ntbCfg
.
pRSmaParam
)
{
if
(
pReq
->
rollup
&&
pReq
->
ntbCfg
.
pRSmaParam
)
{
SRSmaParam
*
param
=
pReq
->
ntbCfg
.
pRSmaParam
;
tlen
+=
taosEncodeFixedU32
(
buf
,
(
uint32_t
)
param
->
xFilesFactor
);
tlen
+=
taosEncodeFixedI8
(
buf
,
param
->
delayUnit
);
...
...
@@ -427,7 +427,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
ntbCfg
.
pSchema
[
i
].
name
);
}
buf
=
taosDecodeFixedI16
(
buf
,
&
(
pReq
->
ntbCfg
.
nBSmaCols
));
if
(
pReq
->
ntbCfg
.
nBSmaCols
>
0
)
{
if
(
pReq
->
ntbCfg
.
nBSmaCols
>
0
)
{
pReq
->
ntbCfg
.
pBSmaCols
=
(
col_id_t
*
)
malloc
(
pReq
->
ntbCfg
.
nBSmaCols
*
sizeof
(
col_id_t
));
for
(
col_id_t
i
=
0
;
i
<
pReq
->
ntbCfg
.
nBSmaCols
;
++
i
)
{
buf
=
taosDecodeFixedI16
(
buf
,
pReq
->
ntbCfg
.
pBSmaCols
+
i
);
...
...
@@ -435,10 +435,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
}
else
{
pReq
->
ntbCfg
.
pBSmaCols
=
NULL
;
}
if
(
pReq
->
rollup
)
{
if
(
pReq
->
rollup
)
{
pReq
->
ntbCfg
.
pRSmaParam
=
(
SRSmaParam
*
)
malloc
(
sizeof
(
SRSmaParam
));
SRSmaParam
*
param
=
pReq
->
ntbCfg
.
pRSmaParam
;
buf
=
taosDecodeFixedU32
(
buf
,
(
uint32_t
*
)
&
param
->
xFilesFactor
);
buf
=
taosDecodeFixedU32
(
buf
,
(
uint32_t
*
)
&
param
->
xFilesFactor
);
buf
=
taosDecodeFixedI8
(
buf
,
&
param
->
delayUnit
);
buf
=
taosDecodeFixedI8
(
buf
,
&
param
->
nFuncIds
);
if
(
param
->
nFuncIds
>
0
)
{
...
...
@@ -3045,7 +3045,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
outputTbName
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
output
S
TbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
astLen
)
<
0
)
return
-
1
;
...
...
@@ -3068,7 +3068,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
outputTbName
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
output
S
TbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
astLen
)
<
0
)
return
-
1
;
...
...
@@ -3101,12 +3101,14 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
pipeSource
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
pipeSink
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
nextOpDst
)
<
0
)
return
-
1
;
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
pTask
->
sinkType
==
STREAM_SINK_TYPE__ASSIGNED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
sinkVgId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
}
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
qmsg
)
<
0
)
return
-
1
;
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
...
...
@@ -3118,12 +3120,14 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
pipeSource
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
pipeSink
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
nextOpDst
)
<
0
)
return
-
1
;
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
pTask
->
sinkType
==
STREAM_SINK_TYPE__ASSIGNED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
sinkVgId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
}
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
qmsg
)
<
0
)
return
-
1
;
/*tEndDecode(pDecoder);*/
return
0
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
ea54ac9d
...
...
@@ -720,6 +720,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
outputSTbName
[
TSDB_TABLE_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
...
...
@@ -729,11 +730,12 @@ typedef struct {
SRWLatch
lock
;
int8_t
status
;
// int32_t sqlLen;
int32_t
sinkVgId
;
// 0 for automatic
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
SArray
*
outputName
;
SArray
*
ColAlias
;
}
SStreamObj
;
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
ea54ac9d
...
...
@@ -16,6 +16,7 @@
#include "mndDef.h"
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
sz
=
0
;
int32_t
outputNameSz
=
0
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
db
)
<
0
)
return
-
1
;
...
...
@@ -30,27 +31,26 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
// TODO encode tasks
if
(
pObj
->
tasks
)
{
int32_t
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
tEncodeI32
(
pEncoder
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGet
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
tEncodeI32
(
pEncoder
,
innerSz
);
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGet
(
pArray
,
j
);
tEncodeSStreamTask
(
pEncoder
,
pTask
);
}
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
}
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGet
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
if
(
tEncodeI32
(
pEncoder
,
innerSz
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGet
(
pArray
,
j
);
if
(
tEncodeSStreamTask
(
pEncoder
,
pTask
)
<
0
)
return
-
1
;
}
}
else
{
tEncodeI32
(
pEncoder
,
0
);
}
if
(
pObj
->
outputName
!=
NULL
)
{
outputNameSz
=
taosArrayGetSize
(
pObj
->
outputName
);
if
(
pObj
->
ColAlias
!=
NULL
)
{
outputNameSz
=
taosArrayGetSize
(
pObj
->
ColAlias
);
}
if
(
tEncodeI32
(
pEncoder
,
outputNameSz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
outputNameSz
;
i
++
)
{
char
*
name
=
taosArrayGetP
(
pObj
->
outputName
,
i
);
char
*
name
=
taosArrayGetP
(
pObj
->
ColAlias
,
i
);
if
(
tEncodeCStr
(
pEncoder
,
name
)
<
0
)
return
-
1
;
}
return
pEncoder
->
pos
;
...
...
@@ -68,6 +68,7 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
logicalPlan
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
pObj
->
tasks
=
NULL
;
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
...
...
@@ -83,19 +84,19 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
}
taosArrayPush
(
pObj
->
tasks
,
pArray
);
}
}
else
{
pObj
->
tasks
=
NULL
;
}
int32_t
outputNameSz
;
if
(
tDecodeI32
(
pDecoder
,
&
outputNameSz
)
<
0
)
return
-
1
;
pObj
->
outputName
=
taosArrayInit
(
outputNameSz
,
sizeof
(
void
*
));
if
(
pObj
->
outputName
==
NULL
)
{
return
-
1
;
if
(
outputNameSz
!=
0
)
{
pObj
->
ColAlias
=
taosArrayInit
(
outputNameSz
,
sizeof
(
void
*
));
if
(
pObj
->
ColAlias
==
NULL
)
{
return
-
1
;
}
}
for
(
int32_t
i
=
0
;
i
<
outputNameSz
;
i
++
)
{
char
*
name
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
name
)
<
0
)
return
-
1
;
taosArrayPush
(
pObj
->
outputName
,
&
name
);
taosArrayPush
(
pObj
->
ColAlias
,
&
name
);
}
return
0
;
}
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
ea54ac9d
...
...
@@ -58,7 +58,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
action
.
contLen
=
tlen
;
action
.
msgType
=
type
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
free
(
buf
);
return
-
1
;
}
return
0
;
...
...
@@ -131,13 +131,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
lastUsedVgId
=
pVgroup
->
vgId
;
pStream
->
vgNum
++
;
// send to vnode
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
pTask
->
pipeSource
=
1
;
pTask
->
pipeSink
=
level
==
totLevel
-
1
?
1
:
0
;
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
);
pTask
->
level
=
level
;
pTask
->
sourceType
=
1
;
pTask
->
sinkType
=
level
==
totLevel
-
1
?
1
:
0
;
pTask
->
parallelizable
=
1
;
// TODO: set to
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
...
...
@@ -146,13 +145,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
taosArrayPush
(
taskOneLevel
,
pTask
);
}
}
else
{
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
pTask
->
pipeSource
=
0
;
pTask
->
pipeSink
=
level
==
totLevel
-
1
?
1
:
0
;
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
);
pTask
->
level
=
level
;
pTask
->
sourceType
=
0
;
pTask
->
sinkType
=
level
==
totLevel
-
1
?
1
:
0
;
pTask
->
parallelizable
=
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
;
pTask
->
nextOpDst
=
STREAM_NEXT_OP_DST__VND
;
if
(
tsStreamSchedV
)
{
SSnodeObj
*
pSnode
=
mndSchedFetchSnode
(
pMnode
);
if
(
pSnode
==
NULL
||
tsStreamSchedV
)
{
ASSERT
(
lastUsedVgId
!=
0
);
SVgObj
*
pVg
=
mndAcquireVgroup
(
pMnode
,
lastUsedVgId
);
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVg
)
<
0
)
{
...
...
@@ -162,24 +163,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
sdbRelease
(
pSdb
,
pVg
);
}
else
{
SSnodeObj
*
pSnode
=
mndSchedFetchSnode
(
pMnode
);
if
(
pSnode
!=
NULL
)
{
if
(
mndAssignTaskToSnode
(
pMnode
,
pTrans
,
pTask
,
plan
,
pSnode
)
<
0
)
{
sdbRelease
(
pSdb
,
pSnode
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pSnode
);
}
else
{
// TODO: assign to one vg
ASSERT
(
0
);
if
(
mndAssignTaskToSnode
(
pMnode
,
pTrans
,
pTask
,
plan
,
pSnode
)
<
0
)
{
sdbRelease
(
pSdb
,
pSnode
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
}
sdbRelease
(
pMnode
->
pSdb
,
pSnode
);
taosArrayPush
(
taskOneLevel
,
pTask
);
}
taosArrayPush
(
pStream
->
tasks
,
taskOneLevel
);
}
qDestroyQueryPlan
(
pPlan
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
ea54ac9d
...
...
@@ -271,6 +271,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
}
if
(
pOld
->
numOfSmas
<
pNew
->
numOfSmas
)
{
void
*
pSmas
=
malloc
(
pNew
->
numOfSmas
*
sizeof
(
SSchema
));
if
(
pSmas
!=
NULL
)
{
free
(
pOld
->
pSmas
);
pOld
->
pSmas
=
pSmas
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
if
(
pOld
->
commentLen
<
pNew
->
commentLen
)
{
void
*
comment
=
malloc
(
pNew
->
commentLen
);
if
(
comment
!=
NULL
)
{
free
(
pOld
->
comment
);
pOld
->
comment
=
comment
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
version
=
pNew
->
version
;
pOld
->
nextColId
=
pNew
->
nextColId
;
...
...
@@ -278,7 +302,9 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld
->
numOfTags
=
pNew
->
numOfTags
;
memcpy
(
pOld
->
pColumns
,
pNew
->
pColumns
,
pOld
->
numOfColumns
*
sizeof
(
SSchema
));
memcpy
(
pOld
->
pTags
,
pNew
->
pTags
,
pOld
->
numOfTags
*
sizeof
(
SSchema
));
memcpy
(
pOld
->
comment
,
pNew
->
comment
,
TSDB_STB_COMMENT_LEN
);
if
(
pNew
->
commentLen
!=
0
)
{
memcpy
(
pOld
->
comment
,
pNew
->
comment
,
TSDB_STB_COMMENT_LEN
);
}
taosWUnLockLatch
(
&
pOld
->
lock
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
ea54ac9d
...
...
@@ -272,6 +272,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
return
-
1
;
}
#if 1
SArray
*
names
=
mndExtractNamesFromAst
(
pAst
);
printf
(
"|"
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
names
);
i
++
)
{
...
...
@@ -279,7 +280,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
}
printf
(
"
\n
=======================================================
\n
"
);
pStream
->
outputName
=
names
;
pStream
->
ColAlias
=
names
;
#endif
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
&
pStream
->
physicalPlan
))
{
mError
(
"topic:%s, failed to get plan since %s"
,
pStream
->
name
,
terrstr
());
...
...
@@ -290,6 +292,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
mError
(
"stream:%ld, schedule stream since %s"
,
pStream
->
uid
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pStream
->
name
);
SSdbRaw
*
pRedoRaw
=
mndStreamActionEncode
(
pStream
);
if
(
pRedoRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
...
...
@@ -307,6 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
SStreamObj
streamObj
=
{
0
};
tstrncpy
(
streamObj
.
name
,
pCreate
->
name
,
TSDB_STREAM_FNAME_LEN
);
tstrncpy
(
streamObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
tstrncpy
(
streamObj
.
outputSTbName
,
pCreate
->
outputSTbName
,
TSDB_TABLE_FNAME_LEN
);
streamObj
.
createTime
=
taosGetTimestampMs
();
streamObj
.
updateTime
=
streamObj
.
createTime
;
streamObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
strlen
(
pCreate
->
name
));
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
ea54ac9d
...
...
@@ -358,9 +358,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return
0
;
}
int32_t
mndStart
(
SMnode
*
pMnode
)
{
return
mndInitTimer
(
pMnode
);
}
int32_t
mndStart
(
SMnode
*
pMnode
)
{
return
mndInitTimer
(
pMnode
);
}
int32_t
mndProcessMsg
(
SNodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pNode
;
...
...
@@ -415,11 +413,11 @@ int64_t mndGenerateUid(char *name, int32_t len) {
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
do
{
int64_t
us
=
taosGetTimestampUs
();
int64_t
us
=
taosGetTimestampUs
();
int64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
int64_t
uuid
=
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
if
(
uuid
)
{
return
abs
(
uuid
);
return
ll
abs
(
uuid
);
}
}
while
(
true
);
}
...
...
source/dnode/vnode/src/inc/tqInt.h
浏览文件 @
ea54ac9d
...
...
@@ -161,15 +161,16 @@ typedef struct {
struct
STQ
{
// the collection of groups
// the handle of meta kvstore
bool
writeTrigger
;
char
*
path
;
STqCfg
*
tqConfig
;
STqMemRef
tqMemRef
;
STqMetaStore
*
tqMeta
;
STqPushMgr
*
tqPushMgr
;
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
SMeta
*
pVnodeMeta
;
//
STqPushMgr* tqPushMgr;
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
SMeta
*
pVnodeMeta
;
};
typedef
struct
{
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
ea54ac9d
...
...
@@ -55,6 +55,21 @@ typedef struct SVnodeMgr {
TD_DLIST
(
SVnodeTask
)
queue
;
}
SVnodeMgr
;
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
int16_t
padding
;
int32_t
smaId
;
int64_t
tbUid
;
int64_t
lastReceivedVer
;
int64_t
lastCommittedVer
;
}
SStreamSinkInfo
;
typedef
struct
{
SVnode
*
pVnode
;
SHashObj
*
pHash
;
// streamId -> SStreamSinkInfo
}
SSink
;
extern
SVnodeMgr
vnodeMgr
;
// SVState
...
...
@@ -72,8 +87,9 @@ struct SVnode {
SVBufPool
*
pBufPool
;
SMeta
*
pMeta
;
STsdb
*
pTsdb
;
STQ
*
pTq
;
SWal
*
pWal
;
STQ
*
pTq
;
SSink
*
pSink
;
tsem_t
canCommit
;
SQHandle
*
pQuery
;
SMsgCb
msgCb
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
ea54ac9d
...
...
@@ -52,12 +52,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
return
NULL
;
}
#if 0
pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) {
// free store
free(pTq);
return NULL;
}
#endif
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -559,7 +561,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
!
pTask
->
pipeSourc
e
)
continue
;
if
(
!
pTask
->
sourceTyp
e
)
continue
;
int32_t
workerId
=
0
;
void
*
exec
=
pTask
->
runner
[
workerId
].
executor
;
...
...
@@ -576,7 +578,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
}
taosArrayPush
(
pRes
,
output
);
}
if
(
pTask
->
pipeSink
)
{
if
(
pTask
->
sinkType
)
{
// write back
/*printf("reach end\n");*/
tqDebugShowSSData
(
pRes
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录