Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f5411f10
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f5411f10
编写于
3月 16, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 16, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10777 from taosdata/feature/tq
Feature/tq
上级
16e5fd22
0ff5b63f
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
283 addition
and
142 deletion
+283
-142
include/common/tmsg.h
include/common/tmsg.h
+69
-30
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/dnode/snode/snode.h
include/dnode/snode/snode.h
+2
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+49
-23
source/dnode/mgmt/impl/src/dndSnode.c
source/dnode/mgmt/impl/src/dndSnode.c
+20
-7
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-7
source/dnode/mnode/impl/inc/mndScheduler.h
source/dnode/mnode/impl/inc/mndScheduler.h
+2
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+47
-28
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+7
-0
source/dnode/snode/CMakeLists.txt
source/dnode/snode/CMakeLists.txt
+2
-1
source/dnode/snode/inc/sndInt.h
source/dnode/snode/inc/sndInt.h
+8
-23
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+63
-12
source/util/src/tuuid.c
source/util/src/tuuid.c
+10
-10
未找到文件。
include/common/tmsg.h
浏览文件 @
f5411f10
...
...
@@ -24,6 +24,7 @@
#include "thash.h"
#include "tlist.h"
#include "trow.h"
#include "tuuid.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -171,7 +172,7 @@ typedef struct {
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
dbId
;
int32_t
vgVersion
;
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
}
SBuildUseDBInput
;
typedef
struct
SField
{
...
...
@@ -427,10 +428,10 @@ typedef struct {
int16_t
slotId
;
};
int16_t
type
;
int32_t
bytes
;
uint8_t
precision
;
uint8_t
scale
;
int16_t
type
;
int32_t
bytes
;
uint8_t
precision
;
uint8_t
scale
;
}
SColumnInfo
;
typedef
struct
{
...
...
@@ -526,7 +527,7 @@ typedef struct {
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
dbId
;
int32_t
vgVersion
;
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
}
SUseDbReq
;
int32_t
tSerializeSUseDbReq
(
void
*
buf
,
int32_t
bufLen
,
SUseDbReq
*
pReq
);
...
...
@@ -553,15 +554,13 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t
tDeserializeSQnodeListReq
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListReq
*
pReq
);
typedef
struct
{
SArray
*
epSetList
;
// SArray<SEpSet>
SArray
*
epSetList
;
// SArray<SEpSet>
}
SQnodeListRsp
;
int32_t
tSerializeSQnodeListRsp
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListRsp
*
pRsp
);
int32_t
tDeserializeSQnodeListRsp
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListRsp
*
pRsp
);
void
tFreeSQnodeListRsp
(
SQnodeListRsp
*
pRsp
);
typedef
struct
{
SArray
*
pArray
;
// Array of SUseDbRsp
}
SUseDbBatchRsp
;
...
...
@@ -777,7 +776,6 @@ typedef struct SVgroupInfo {
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
}
SVgroupInfo
;
typedef
struct
{
int32_t
numOfVgroups
;
SVgroupInfo
vgroups
[];
...
...
@@ -1062,8 +1060,8 @@ typedef struct {
}
STaskStatus
;
typedef
struct
{
int64_t
refId
;
SArray
*
taskStatus
;
//
SArray<STaskStatus>
int64_t
refId
;
SArray
*
taskStatus
;
//
SArray<STaskStatus>
}
SSchedulerStatusRsp
;
typedef
struct
{
...
...
@@ -1072,35 +1070,31 @@ typedef struct {
int8_t
action
;
}
STaskAction
;
typedef
struct
SQueryNodeEpId
{
int32_t
nodeId
;
// vgId or qnodeId
SEp
ep
;
}
SQueryNodeEpId
;
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
SQueryNodeEpId
epId
;
SArray
*
taskAction
;
//
SArray<STaskAction>
SArray
*
taskAction
;
//
SArray<STaskAction>
}
SSchedulerHbReq
;
int32_t
tSerializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
);
int32_t
tDeserializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
);
void
tFreeSSchedulerHbReq
(
SSchedulerHbReq
*
pReq
);
int32_t
tSerializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
);
int32_t
tDeserializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
);
void
tFreeSSchedulerHbReq
(
SSchedulerHbReq
*
pReq
);
typedef
struct
{
uint64_t
seqId
;
SQueryNodeEpId
epId
;
SArray
*
taskStatus
;
//
SArray<STaskStatus>
SArray
*
taskStatus
;
//
SArray<STaskStatus>
}
SSchedulerHbRsp
;
int32_t
tSerializeSSchedulerHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbRsp
*
pRsp
);
int32_t
tDeserializeSSchedulerHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbRsp
*
pRsp
);
void
tFreeSSchedulerHbRsp
(
SSchedulerHbRsp
*
pRsp
);
int32_t
tSerializeSSchedulerHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbRsp
*
pRsp
);
int32_t
tDeserializeSSchedulerHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbRsp
*
pRsp
);
void
tFreeSSchedulerHbRsp
(
SSchedulerHbRsp
*
pRsp
);
typedef
struct
{
SMsgHead
header
;
...
...
@@ -1370,7 +1364,7 @@ typedef struct SVCreateTbReq {
}
SVCreateTbReq
,
SVUpdateTbReq
;
typedef
struct
{
int
tmp
;
// TODO: to avoid compile error
int
tmp
;
// TODO: to avoid compile error
}
SVCreateTbRsp
,
SVUpdateTbRsp
;
int32_t
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
);
...
...
@@ -1382,7 +1376,7 @@ typedef struct {
}
SVCreateTbBatchReq
;
typedef
struct
{
int
tmp
;
// TODO: to avoid compile error
int
tmp
;
// TODO: to avoid compile error
}
SVCreateTbBatchRsp
;
int32_t
tSerializeSVCreateTbBatchReq
(
void
**
buf
,
SVCreateTbBatchReq
*
pReq
);
...
...
@@ -1396,7 +1390,7 @@ typedef struct {
}
SVDropTbReq
;
typedef
struct
{
int
tmp
;
// TODO: to avoid compile error
int
tmp
;
// TODO: to avoid compile error
}
SVDropTbRsp
;
int32_t
tSerializeSVDropTbReq
(
void
**
buf
,
SVDropTbReq
*
pReq
);
...
...
@@ -1934,7 +1928,7 @@ typedef struct {
}
SVCreateTSmaReq
;
typedef
struct
{
int8_t
type
;
// 0 status report, 1 update data
int8_t
type
;
// 0 status report, 1 update data
char
indexName
[
TSDB_INDEX_NAME_LEN
];
//
STimeWindow
windows
;
}
STSmaMsg
;
...
...
@@ -1945,7 +1939,7 @@ typedef struct {
}
SVDropTSmaReq
;
typedef
struct
{
int
tmp
;
// TODO: to avoid compile error
int
tmp
;
// TODO: to avoid compile error
}
SVCreateTSmaRsp
,
SVDropTSmaRsp
;
int32_t
tSerializeSVCreateTSmaReq
(
void
**
buf
,
SVCreateTSmaReq
*
pReq
);
...
...
@@ -2031,7 +2025,7 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
tlen
+=
taosEncodeFixedI64
(
buf
,
pSma
->
interval
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pSma
->
offset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pSma
->
sliding
);
if
(
pSma
->
exprLen
>
0
)
{
tlen
+=
taosEncodeString
(
buf
,
pSma
->
expr
);
}
...
...
@@ -2267,6 +2261,51 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
return
buf
;
}
enum
{
STREAM_TASK_STATUS__RUNNING
=
1
,
STREAM_TASK_STATUS__STOP
,
};
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
level
;
int8_t
status
;
char
*
qmsg
;
void
*
executor
;
// void* stateStore;
// storage handle
}
SStreamTask
;
static
FORCE_INLINE
SStreamTask
*
streamTaskNew
(
int64_t
streamId
,
int32_t
level
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
calloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
status
=
STREAM_TASK_STATUS__RUNNING
;
pTask
->
qmsg
=
NULL
;
return
pTask
;
}
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
typedef
struct
{
SMsgHead
head
;
SStreamTask
*
task
;
}
SStreamTaskDeployReq
;
typedef
struct
{
int32_t
reserved
;
}
SStreamTaskDeployRsp
;
typedef
struct
{
SMsgHead
head
;
// TODO: other info needed by task
}
SStreamTaskExecReq
;
#pragma pack(pop)
#ifdef __cplusplus
...
...
include/common/tmsgdef.h
浏览文件 @
f5411f10
...
...
@@ -199,6 +199,7 @@ enum {
// Requests handled by SNODE
TD_NEW_MSG_SEG
(
TDMT_SND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_DEPLOY
,
"snode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
...
...
include/dnode/snode/snode.h
浏览文件 @
f5411f10
...
...
@@ -16,6 +16,7 @@
#ifndef _TD_SNODE_H_
#define _TD_SNODE_H_
#include "tcommon.h"
#include "tmsg.h"
#include "trpc.h"
...
...
@@ -78,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
* @param pRsp The response message
* @return int32_t 0 for success, -1 for failure
*/
int32_t
sndProcessMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
//
int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t
sndProcessUMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
);
...
...
source/common/src/tmsg.c
浏览文件 @
f5411f10
...
...
@@ -1467,8 +1467,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
return
0
;
}
int32_t
tSerializeSQnodeListReq
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListReq
*
pReq
)
{
int32_t
tSerializeSQnodeListReq
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
@@ -1499,7 +1498,7 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
epSetList
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SEpSet
*
epSet
=
taosArrayGet
(
pRsp
->
epSetList
,
i
);
if
(
tEncodeSEpSet
(
&
encoder
,
epSet
)
<
0
)
return
-
1
;
...
...
@@ -2488,27 +2487,27 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
epId
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
epId
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeU16
(
&
encoder
,
pReq
->
epId
.
ep
.
port
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
epId
.
ep
.
fqdn
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
epId
.
ep
.
fqdn
)
<
0
)
return
-
1
;
if
(
pReq
->
taskAction
)
{
int32_t
num
=
taosArrayGetSize
(
pReq
->
taskAction
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STaskAction
*
action
=
taosArrayGet
(
pReq
->
taskAction
,
i
);
if
(
tEncodeU64
(
&
encoder
,
action
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
action
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
action
->
action
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
action
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
action
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
action
->
action
)
<
0
)
return
-
1
;
}
}
else
{
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
if
(
buf
!=
NULL
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)((
char
*
)
buf
-
headLen
);
pHead
->
vgId
=
htonl
(
pReq
->
header
.
vgId
);
...
...
@@ -2556,29 +2555,27 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *
void
tFreeSSchedulerHbReq
(
SSchedulerHbReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
taskAction
);
}
int32_t
tSerializeSSchedulerHbRsp
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pRsp
->
seqId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
epId
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pRsp
->
seqId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
epId
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeU16
(
&
encoder
,
pRsp
->
epId
.
ep
.
port
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
epId
.
ep
.
fqdn
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
epId
.
ep
.
fqdn
)
<
0
)
return
-
1
;
if
(
pRsp
->
taskStatus
)
{
int32_t
num
=
taosArrayGetSize
(
pRsp
->
taskStatus
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STaskStatus
*
status
=
taosArrayGet
(
pRsp
->
taskStatus
,
i
);
if
(
tEncodeU64
(
&
encoder
,
status
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
status
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
status
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
status
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
status
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
status
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
status
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
status
->
status
)
<
0
)
return
-
1
;
}
}
else
{
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
...
...
@@ -2691,3 +2688,32 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree
(
pReq
->
physicalPlan
);
tfree
(
pReq
->
logicalPlan
);
}
int32_t
tEncodeSStreamTask
(
SCoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamTask
(
SCoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pDecoder
,
(
const
char
**
)
&
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
// TODO
/*free(pTask->qmsg);*/
/*free(pTask->executor);*/
/*free(pTask);*/
}
source/dnode/mgmt/impl/src/dndSnode.c
浏览文件 @
f5411f10
...
...
@@ -323,8 +323,8 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
static
void
dndProcessSnodeUniqueQueue
(
SDnode
*
pDnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
SSnode
*
pSnode
=
dndAcquireSnode
(
pDnode
);
if
(
pSnode
!=
NULL
)
{
...
...
@@ -334,22 +334,35 @@ static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t
sndProcessUMsg
(
pSnode
,
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
dndReleaseSnode
(
pDnode
,
pSnode
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
dndReleaseSnode
(
pDnode
,
pSnode
);
}
static
void
dndProcessSnodeSharedQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
&
pDnode
->
smgmt
;
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
int32_t
code
=
TSDB_CODE_DND_SNODE_NOT_DEPLOYED
;
SSnode
*
pSnode
=
dndAcquireSnode
(
pDnode
);
if
(
pSnode
!=
NULL
)
{
code
=
sndProcessSMsg
(
pSnode
,
pMsg
);
sndProcessSMsg
(
pSnode
,
pMsg
);
dndReleaseSnode
(
pDnode
,
pSnode
);
}
else
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
}
dndReleaseSnode
(
pDnode
,
pSnode
);
#if 0
if (pMsg->msgType & 1u) {
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
f5411f10
...
...
@@ -85,6 +85,8 @@ typedef enum {
TRN_TYPE_REBALANCE
=
1017
,
TRN_TYPE_COMMIT_OFFSET
=
1018
,
TRN_TYPE_CREATE_STREAM
=
1019
,
TRN_TYPE_DROP_STREAM
=
1020
,
TRN_TYPE_ALTER_STREAM
=
1021
,
TRN_TYPE_BASIC_SCOPE_END
,
TRN_TYPE_GLOBAL_SCOPE
=
2000
,
TRN_TYPE_CREATE_DNODE
=
2001
,
...
...
@@ -679,12 +681,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
return
buf
;
}
typedef
struct
{
int32_t
taskId
;
int32_t
level
;
SSubplan
*
plan
;
}
SStreamTaskMeta
;
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -700,7 +696,7 @@ typedef struct {
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask
Meta
>>
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
}
SStreamObj
;
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
...
...
source/dnode/mnode/impl/inc/mndScheduler.h
浏览文件 @
f5411f10
...
...
@@ -27,6 +27,8 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t
mndSchedInitSubEp
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
);
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
f5411f10
...
...
@@ -31,7 +31,7 @@
#include "tname.h"
#include "tuuid.h"
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
)
{
int32_t
mndScheduleStream
(
SMnode
*
pMnode
,
S
Trans
*
pTrans
,
S
StreamObj
*
pStream
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pStream
->
physicalPlan
);
...
...
@@ -41,17 +41,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
ASSERT
(
pStream
->
vgNum
==
0
);
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
pStream
->
tasks
=
taosArrayInit
(
levelNum
,
sizeof
(
SArray
));
int32_t
totLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
pStream
->
tasks
=
taosArrayInit
(
totLevel
,
sizeof
(
SArray
));
for
(
int32_t
i
=
0
;
i
<
levelNum
;
i
++
)
{
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
sizeof
(
SStreamTaskMeta
));
SNodeListNode
*
inner
=
nodesListGetNode
(
pPlan
->
pSubplans
,
i
);
int32_t
msgLen
;
for
(
int32_t
level
=
0
;
level
<
totLevel
;
level
++
)
{
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
sizeof
(
SStreamTask
));
SNodeListNode
*
inner
=
nodesListGetNode
(
pPlan
->
pSubplans
,
level
);
int32_t
opNum
=
LIST_LENGTH
(
inner
->
pNodeList
);
ASSERT
(
opNum
==
1
);
SSubplan
*
plan
=
nodesListGetNode
(
inner
->
pNodeList
,
0
);
if
(
i
==
0
)
{
SSubplan
*
plan
=
nodesListGetNode
(
inner
->
pNodeList
,
level
);
if
(
level
==
0
)
{
ASSERT
(
plan
->
type
==
SUBPLAN_TYPE_SCAN
);
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
@@ -63,15 +64,19 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
pStream
->
vgNum
++
;
// send to vnode
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
SStreamTaskMeta
task
=
{
.
taskId
=
tGenIdPI32
(),
.
level
=
i
,
.
plan
=
plan
,
}
;
// send to vnode
taosArrayPush
(
taskOneLevel
,
&
t
ask
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pT
ask
);
}
}
else
if
(
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
)
{
// duplicatable
...
...
@@ -82,22 +87,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// if has snode, set to shared thread num in snode
parallel
=
SND_SHARED_THREAD_NUM
;
for
(
int32_t
j
=
0
;
j
<
parallel
;
j
++
)
{
SStreamTaskMeta
task
=
{
.
taskId
=
tGenIdPI32
(),
.
level
=
i
,
.
plan
=
plan
,
};
taosArrayPush
(
taskOneLevel
,
&
task
);
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
// TODO:get snode id and ep
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
}
else
{
// not duplicatable
SStreamTaskMeta
task
=
{
.
taskId
=
tGenIdPI32
(),
.
level
=
i
,
.
plan
=
plan
,
};
taosArrayPush
(
taskOneLevel
,
&
task
);
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
// TODO:get snode id and ep
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
taosArrayPush
(
pStream
->
tasks
,
taskOneLevel
);
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
f5411f10
...
...
@@ -18,6 +18,7 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h"
...
...
@@ -237,6 +238,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
if
(
mndScheduleStream
(
pMnode
,
pTrans
,
&
streamObj
)
<
0
)
{
mError
(
"stream:%ld, schedule stream since %s"
,
streamObj
.
uid
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
...
...
source/dnode/snode/CMakeLists.txt
浏览文件 @
f5411f10
...
...
@@ -7,8 +7,9 @@ target_include_directories(
)
target_link_libraries
(
snode
PRIVATE executor
PRIVATE transport
PRIVATE os
PRIVATE common
PRIVATE util
)
\ No newline at end of file
)
source/dnode/snode/inc/sndInt.h
浏览文件 @
f5411f10
...
...
@@ -38,13 +38,8 @@ enum {
STREAM_STATUS__DELETING
,
};
enum
{
STREAM_TASK_STATUS__RUNNING
=
1
,
STREAM_TASK_STATUS__STOP
,
};
typedef
struct
{
SHashObj
*
pHash
;
// taskId ->
s
treamTask
SHashObj
*
pHash
;
// taskId ->
SS
treamTask
}
SStreamMeta
;
typedef
struct
SSnode
{
...
...
@@ -52,26 +47,16 @@ typedef struct SSnode {
SSnodeOpt
cfg
;
}
SSnode
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
IdxInLevel
;
int32_t
level
;
}
SStreamTaskInfo
;
SStreamMeta
*
sndMetaNew
();
void
sndMetaDelete
(
SStreamMeta
*
pMeta
);
typedef
struct
{
SStreamTaskInfo
meta
;
int8_t
status
;
void
*
executor
;
void
*
stateStore
;
// storage handle
}
SStreamTask
;
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
int32_t
sndMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
int32_t
sndCreateTask
();
int32_t
sndDropTaskOfStream
(
int64_t
streamId
);
int32_t
sndDropTaskOfStream
(
SStreamMeta
*
pMeta
,
int64_t
streamId
);
int32_t
sndStopTaskOfStream
(
int64_t
streamId
);
int32_t
sndResumeTaskOfStream
(
int64_t
streamId
);
int32_t
sndStopTaskOfStream
(
SStreamMeta
*
pMeta
,
int64_t
streamId
);
int32_t
sndResumeTaskOfStream
(
SStreamMeta
*
pMeta
,
int64_t
streamId
);
#ifdef __cplusplus
}
...
...
source/dnode/snode/src/snode.c
浏览文件 @
f5411f10
...
...
@@ -13,40 +13,91 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "sndInt.h"
#include "tuuid.h"
SSnode
*
sndOpen
(
const
char
*
path
,
const
SSnodeOpt
*
pOption
)
{
SSnode
*
pSnode
=
calloc
(
1
,
sizeof
(
SSnode
));
if
(
pSnode
==
NULL
)
{
return
NULL
;
}
memcpy
(
&
pSnode
->
cfg
,
pOption
,
sizeof
(
SSnodeOpt
));
pSnode
->
pMeta
=
sndMetaNew
();
if
(
pSnode
->
pMeta
==
NULL
)
{
free
(
pSnode
);
return
NULL
;
}
return
pSnode
;
}
void
sndClose
(
SSnode
*
pSnode
)
{
free
(
pSnode
);
}
void
sndClose
(
SSnode
*
pSnode
)
{
sndMetaDelete
(
pSnode
->
pMeta
);
free
(
pSnode
);
}
int32_t
sndGetLoad
(
SSnode
*
pSnode
,
SSnodeLoad
*
pLoad
)
{
return
0
;
}
int32_t
sndProcessMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
*
pRsp
=
NULL
;
return
0
;
}
/*int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {*/
/**pRsp = NULL;*/
/*return 0;*/
/*}*/
void
sndDestroy
(
const
char
*
path
)
{}
static
int32_t
sndDeployTask
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
SStreamTask
*
task
=
malloc
(
sizeof
(
SStreamTask
));
if
(
task
==
NULL
)
{
SStreamMeta
*
sndMetaNew
()
{
SStreamMeta
*
pMeta
=
calloc
(
1
,
sizeof
(
SStreamMeta
));
if
(
pMeta
==
NULL
)
{
return
NULL
;
}
pMeta
->
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pMeta
->
pHash
==
NULL
)
{
free
(
pMeta
);
return
NULL
;
}
return
pMeta
;
}
void
sndMetaDelete
(
SStreamMeta
*
pMeta
)
{
taosHashCleanup
(
pMeta
->
pHash
);
free
(
pMeta
);
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
pTask
->
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
int32_t
sndMetaRemoveTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
SStreamTask
*
pTask
=
taosHashGet
(
pMeta
->
pHash
,
&
taskId
,
sizeof
(
int32_t
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
task
->
meta
.
taskId
=
tGenIdPI32
();
taosHashPut
(
pSnode
->
pMeta
->
pHash
,
&
task
->
meta
.
taskId
,
sizeof
(
int32_t
),
&
task
,
sizeof
(
void
*
));
return
0
;
free
(
pTask
->
qmsg
);
// TODO:free executor
free
(
pTask
);
return
taosHashRemove
(
pMeta
->
pHash
,
&
taskId
,
sizeof
(
int32_t
));
}
int32_t
sndProcessUMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
// stream deploy
ment
// stream deploy
// stream stop/resume
// operator exec
if
(
pMsg
->
msgType
==
TDMT_SND_TASK_DEPLOY
)
{
void
*
msg
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
SStreamTask
*
pTask
=
malloc
(
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SCoder
decoder
;
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
msg
,
pMsg
->
contLen
-
sizeof
(
SMsgHead
),
TD_DECODER
);
tDecodeSStreamTask
(
&
decoder
,
pTask
);
tCoderClear
(
&
decoder
);
sndMetaDeployTask
(
pSnode
->
pMeta
,
pTask
);
}
else
{
//
}
return
0
;
}
...
...
source/util/src/tuuid.c
浏览文件 @
f5411f10
...
...
@@ -15,43 +15,43 @@
#include "tuuid.h"
static
int64_t
h
ashId
=
0
;
static
int32_t
SerialNo
=
0
;
static
int64_t
tUUIDH
ashId
=
0
;
static
int32_t
tUUID
SerialNo
=
0
;
int32_t
tGenIdPI32
(
void
)
{
if
(
h
ashId
==
0
)
{
if
(
tUUIDH
ashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
else
{
h
ashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
tUUIDH
ashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
SerialNo
,
1
);
int32_t
val
=
atomic_add_fetch_32
(
&
tUUID
SerialNo
,
1
);
int32_t
id
=
((
h
ashId
&
0x1F
)
<<
26
)
|
((
pid
&
0x3F
)
<<
20
)
|
((
ts
&
0xFFF
)
<<
8
)
|
(
val
&
0xFF
);
int32_t
id
=
((
tUUIDH
ashId
&
0x1F
)
<<
26
)
|
((
pid
&
0x3F
)
<<
20
)
|
((
ts
&
0xFFF
)
<<
8
)
|
(
val
&
0xFF
);
return
id
;
}
int64_t
tGenIdPI64
(
void
)
{
if
(
h
ashId
==
0
)
{
if
(
tUUIDH
ashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
else
{
h
ashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
tUUIDH
ashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
SerialNo
,
1
);
int32_t
val
=
atomic_add_fetch_32
(
&
tUUID
SerialNo
,
1
);
int64_t
id
=
((
h
ashId
&
0x07FF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
int64_t
id
=
((
tUUIDH
ashId
&
0x07FF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录