Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8477d22d
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8477d22d
编写于
6月 07, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/row_refact
上级
f60e2879
7c65ac75
变更
48
展开全部
显示空白变更内容
内联
并排
Showing
48 changed file
with
1382 addition
and
583 deletion
+1382
-583
include/common/tmsg.h
include/common/tmsg.h
+51
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+11
-14
include/common/ttokendef.h
include/common/ttokendef.h
+3
-3
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-1
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+32
-8
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+179
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+2
-1
source/dnode/mgmt/mgmt_snode/src/smHandle.c
source/dnode/mgmt/mgmt_snode/src/smHandle.c
+4
-3
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+7
-12
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+2
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+7
-8
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+104
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+2
-2
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+12
-14
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+3
-5
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+11
-9
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+94
-56
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+53
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+34
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+5
-0
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+2
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+18
-6
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+5
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+60
-2
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-0
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+384
-384
source/libs/parser/test/parExplainToSyncdbTest.cpp
source/libs/parser/test/parExplainToSyncdbTest.cpp
+57
-3
source/libs/parser/test/parInitialATest.cpp
source/libs/parser/test/parInitialATest.cpp
+13
-0
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+52
-17
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+1
-1
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+16
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+3
-2
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-0
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+21
-14
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+1
-1
source/libs/stream/src/streamMsg.c
source/libs/stream/src/streamMsg.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+83
-3
tests/pytest/test.py
tests/pytest/test.py
+1
-1
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+1
-1
tests/system-test/2-query/between.py
tests/system-test/2-query/between.py
+1
-1
tests/system-test/2-query/cast.py
tests/system-test/2-query/cast.py
+4
-4
tests/system-test/test-all.bat
tests/system-test/test-all.bat
+30
-1
tests/system-test/test.py
tests/system-test/test.py
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
8477d22d
...
...
@@ -1315,6 +1315,31 @@ typedef struct {
int32_t
tSerializeSKillTransReq
(
void
*
buf
,
int32_t
bufLen
,
SKillTransReq
*
pReq
);
int32_t
tDeserializeSKillTransReq
(
void
*
buf
,
int32_t
bufLen
,
SKillTransReq
*
pReq
);
typedef
struct
{
int32_t
useless
;
// useless
}
SBalanceVgroupReq
;
int32_t
tSerializeSBalanceVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SBalanceVgroupReq
*
pReq
);
int32_t
tDeserializeSBalanceVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SBalanceVgroupReq
*
pReq
);
typedef
struct
{
int32_t
vgId1
;
int32_t
vgId2
;
}
SMergeVgroupReq
;
int32_t
tSerializeSMergeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SMergeVgroupReq
*
pReq
);
int32_t
tDeserializeSMergeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SMergeVgroupReq
*
pReq
);
typedef
struct
{
int32_t
vgId
;
int32_t
dnodeId1
;
int32_t
dnodeId2
;
int32_t
dnodeId3
;
}
SRedistributeVgroupReq
;
int32_t
tSerializeSRedistributeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SRedistributeVgroupReq
*
pReq
);
int32_t
tDeserializeSRedistributeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SRedistributeVgroupReq
*
pReq
);
typedef
struct
{
char
user
[
TSDB_USER_LEN
];
char
spi
;
...
...
@@ -2460,6 +2485,32 @@ typedef struct {
int32_t
tSerializeSUserIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SUserIndexRsp
*
pRsp
);
int32_t
tDeserializeSUserIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
SUserIndexRsp
*
pRsp
);
typedef
struct
{
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
}
STableIndexReq
;
int32_t
tSerializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
);
int32_t
tDeserializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
);
typedef
struct
{
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int64_t
dstTbUid
;
int32_t
dstVgId
;
// for stream
char
*
expr
;
}
STableIndexInfo
;
typedef
struct
{
SArray
*
pIndex
;
}
STableIndexRsp
;
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
int32_t
tDeserializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
STableIndexRsp
*
pRsp
);
typedef
struct
{
int8_t
mqMsgType
;
int32_t
code
;
...
...
include/common/tmsgdef.h
浏览文件 @
8477d22d
...
...
@@ -130,6 +130,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_INDEX
,
"create-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_INDEX
,
"drop-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_INDEX
,
"get-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_TABLE_INDEX
,
"get-table-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_TOPIC
,
"create-topic"
,
SMCreateTopicReq
,
SMCreateTopicRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_TOPIC
,
"alter-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_TOPIC
,
"drop-topic"
,
NULL
,
NULL
)
...
...
@@ -153,6 +154,9 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_GRANT
,
"grant"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_AUTH
,
"auth"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_APPLY_MSG
,
"mnode-apply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_BALANCE_VGROUP
,
"balance-vgroup"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MERGE_VGROUP
,
"merge-vgroup"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_REDISTRIBUTE_VGROUP
,
"redistribute-vgroup"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT
,
"submit"
,
SSubmitReq
,
SSubmitRsp
)
...
...
@@ -169,10 +173,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_STB
,
"vnode-create-stb"
,
SVCreateStbReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_STB
,
"vnode-alter-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_STB
,
"vnode-drop-stb"
,
SVDropStbReq
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CONSUME
,
"vnode-mq-consume"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_QUERY
,
"vnode-mq-query"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CONNECT
,
"vnode-mq-connect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_DISCONNECT
,
"vnode-mq-disconnect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_CHANGE
,
"vnode-mq-vg-change"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_DELETE
,
"vnode-mq-vg-delete"
,
SMqVDeleteReq
,
SMqVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_TASK
,
"vnode-cancel-task"
,
NULL
,
NULL
)
...
...
@@ -183,12 +183,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_EXPLAIN
,
"vnode-explain"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqPollReq
,
SMqDataBlkRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DEPLOY
,
"vnode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_TRIGGER
,
"vnode-stream-trigger"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_RUN
,
"vnode-stream-task-run"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DISPATCH
,
"vnode-stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_DISPATCH_WRITE
,
"vnode-stream-task-dispatch-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASK_RECOVER
,
"vnode-stream-task-recover"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STREAM_DISPATCH_WRITE
,
"vnode-stream-task-dispatch-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
...
...
@@ -202,11 +198,12 @@ enum {
TD_NEW_MSG_SEG
(
TDMT_QND_MSG
)
TD_NEW_MSG_SEG
(
TDMT_SND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_DEPLOY
,
"snode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_RUN
,
"snode-stream-task-run"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_DISPATCH
,
"snode-stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_RECOVER
,
"snode-stream-task-recover"
,
NULL
,
NULL
)
//shared by snode and vnode
TD_NEW_MSG_SEG
(
TDMT_STREAM_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DEPLOY
,
"stream-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RUN
,
"stream-task-run"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DISPATCH
,
"stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RECOVER
,
"stream-task-recover"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_LINK_BROKEN
,
"link-broken"
,
NULL
,
NULL
)
...
...
include/common/ttokendef.h
浏览文件 @
8477d22d
...
...
@@ -188,10 +188,10 @@
#define TK_KILL 170
#define TK_CONNECTION 171
#define TK_TRANSACTION 172
#define TK_
MERGE
173
#define TK_
BALANCE
173
#define TK_VGROUP 174
#define TK_
REDISTRIBUTE
175
#define TK_
SPLIT
176
#define TK_
MERGE
175
#define TK_
REDISTRIBUTE
176
#define TK_SYNCDB 177
#define TK_DELETE 178
#define TK_NULL 179
...
...
include/libs/catalog/catalog.h
浏览文件 @
8477d22d
...
...
@@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t
catalogGetIndexMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
indexName
,
SIndexInfo
*
pInfo
);
int32_t
catalogGetTableIndex
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
tbFName
,
SArray
**
pRes
);
int32_t
catalogGetUdfInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
funcName
,
SFuncInfo
*
pInfo
);
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
);
...
...
@@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t
catalogUpdateVgEpSet
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
epSet
);
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
uint64_t
reqId
,
bool
forceUpdate
);
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
8477d22d
...
...
@@ -28,6 +28,14 @@ extern "C" {
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef
struct
SDatabaseOptions
{
ENodeType
type
;
int32_t
buffer
;
...
...
@@ -316,14 +324,6 @@ typedef struct SDropFunctionStmt {
bool
ignoreNotExists
;
}
SDropFunctionStmt
;
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef
struct
SGrantStmt
{
ENodeType
type
;
char
userName
[
TSDB_USER_LEN
];
...
...
@@ -333,6 +333,30 @@ typedef struct SGrantStmt {
typedef
SGrantStmt
SRevokeStmt
;
typedef
struct
SBalanceVgroupStmt
{
ENodeType
type
;
}
SBalanceVgroupStmt
;
typedef
struct
SMergeVgroupStmt
{
ENodeType
type
;
int32_t
vgId1
;
int32_t
vgId2
;
}
SMergeVgroupStmt
;
typedef
struct
SRedistributeVgroupStmt
{
ENodeType
type
;
int32_t
vgId
;
int32_t
dnodeId1
;
int32_t
dnodeId2
;
int32_t
dnodeId3
;
SNodeList
*
pDnodes
;
}
SRedistributeVgroupStmt
;
typedef
struct
SSplitVgroupStmt
{
ENodeType
type
;
int32_t
vgId
;
}
SSplitVgroupStmt
;
#ifdef __cplusplus
}
#endif
...
...
include/libs/nodes/nodes.h
浏览文件 @
8477d22d
...
...
@@ -140,6 +140,7 @@ typedef enum ENodeType {
QUERY_NODE_DROP_FUNCTION_STMT
,
QUERY_NODE_CREATE_STREAM_STMT
,
QUERY_NODE_DROP_STREAM_STMT
,
QUERY_NODE_BALANCE_VGROUP_STMT
,
QUERY_NODE_MERGE_VGROUP_STMT
,
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
,
QUERY_NODE_SPLIT_VGROUP_STMT
,
...
...
include/util/taoserror.h
浏览文件 @
8477d22d
...
...
@@ -653,6 +653,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_VALUE_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x2653)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2654)
#define TSDB_CODE_PAR_INVALID_DELETE_WHERE TAOS_DEF_ERROR_CODE(0, 0x2655)
#define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...
...
source/common/src/tmsg.c
浏览文件 @
8477d22d
...
...
@@ -2394,6 +2394,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp
return
0
;
}
int32_t
tSerializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
tbFName
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
tbFName
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSTableIndexInfo
(
SEncoder
*
pEncoder
,
STableIndexInfo
*
pInfo
)
{
if
(
tEncodeI8
(
pEncoder
,
pInfo
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pInfo
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
interval
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
offset
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
sliding
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pInfo
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pInfo
->
expr
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
pIndex
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableIndexInfo
*
pInfo
=
(
STableIndexInfo
*
)
taosArrayGet
(
pRsp
->
pIndex
,
i
);
if
(
tSerializeSTableIndexInfo
(
&
encoder
,
pInfo
)
<
0
)
return
-
1
;
}
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSTableIndexInfo
(
SDecoder
*
pDecoder
,
STableIndexInfo
*
pInfo
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pInfo
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pInfo
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
interval
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
offset
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
sliding
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pInfo
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pInfo
->
expr
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tDeserializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
STableIndexRsp
*
pRsp
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
pRsp
->
pIndex
=
taosArrayInit
(
num
,
sizeof
(
STableIndexInfo
));
if
(
NULL
==
pRsp
->
pIndex
)
return
-
1
;
STableIndexInfo
info
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
if
(
tDeserializeSTableIndexInfo
(
&
decoder
,
&
info
)
<
0
)
return
-
1
;
if
(
NULL
==
taosArrayPush
(
pRsp
->
pIndex
,
&
info
))
{
taosMemoryFree
(
info
.
expr
);
return
-
1
;
}
}
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSShowReq
(
void
*
buf
,
int32_t
bufLen
,
SShowReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
@@ -3258,6 +3354,89 @@ int32_t tDeserializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq
return
0
;
}
int32_t
tSerializeSBalanceVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SBalanceVgroupReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
useless
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSBalanceVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SBalanceVgroupReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
useless
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSMergeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SMergeVgroupReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
vgId1
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
vgId2
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMergeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SMergeVgroupReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgId1
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgId2
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSRedistributeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SRedistributeVgroupReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId1
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId2
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId3
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSRedistributeVgroupReq
(
void
*
buf
,
int32_t
bufLen
,
SRedistributeVgroupReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId1
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId2
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId3
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSDCreateMnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDCreateMnodeReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
8477d22d
...
...
@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SMA
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STREAM
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_INDEX
,
mmPutNodeMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_TABLE_INDEX
,
mmPutNodeMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
@@ -210,7 +211,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_DELETE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
mmPutNodeMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
VND
_TASK_DEPLOY_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
STREAM
_TASK_DEPLOY_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_snode/src/smHandle.c
浏览文件 @
8477d22d
...
...
@@ -94,9 +94,10 @@ SArray *smGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MON_SM_INFO
,
smPutNodeMsgToMonitorQueue
,
0
)
==
NULL
)
goto
_OVER
;
// Requests handled by SNODE
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SND_TASK_DEPLOY
,
smPutNodeMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
/*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DEPLOY
,
smPutNodeMsgToMgmtQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RUN
,
smPutNodeMsgToMgmtQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH
,
smPutNodeMsgToMgmtQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER
,
smPutNodeMsgToMgmtQueue
,
1
)
==
NULL
)
goto
_OVER
;
code
=
0
;
_OVER:
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
8477d22d
...
...
@@ -210,8 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
dDebug
(
"vgId:%d, start to create vnode, tsma:%d standby:%d"
,
createReq
.
vgId
,
createReq
.
isTsma
,
createReq
.
standby
);
dDebug
(
"vgId:%d, start to create vnode, tsma:%d standby:%d"
,
createReq
.
vgId
,
createReq
.
isTsma
,
createReq
.
standby
);
vmGenerateVnodeCfg
(
&
createReq
,
&
vnodeCfg
);
if
(
vmTsmaAdjustDays
(
&
vnodeCfg
,
&
createReq
)
<
0
)
{
...
...
@@ -333,11 +332,6 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_UPDATE_TAG_VAL
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLE_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TABLES_META
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_CONSUME
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_QUERY
,
vmPutMsgToQueryQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_CONNECT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_DISCONNECT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CANCEL_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_STB
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
@@ -352,13 +346,14 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_CHANGE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_DELETE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CONSUME
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
TASK_DEPLOY
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_
DELETE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_HEARTBEAT
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_STREAM_TRIGGER
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
VND_TASK_RUN
,
vmPutMsgToFetch
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
VND_TASK_DISPATCH
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
VND_TASK_RECOVER
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
VND_DELETE
,
vmPutMsgToWrite
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
STREAM_TASK_DEPLOY
,
vmPutMsgToWrite
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
STREAM_TASK_RUN
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
STREAM_TASK_DISPATCH
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_
STREAM_TASK_RECOVER
,
vmPutMsgToFetch
Queue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
8477d22d
...
...
@@ -95,6 +95,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t
vgId
=
ntohl
(
pHead
->
vgId
);
if
(
vgId
==
QNODE_HANDLE
)
{
pWrapper
=
&
pDnode
->
wrappers
[
QNODE
];
}
else
if
(
vgId
==
SNODE_HANDLE
)
{
pWrapper
=
&
pDnode
->
wrappers
[
SNODE
];
}
else
if
(
vgId
==
MNODE_HANDLE
)
{
pWrapper
=
&
pDnode
->
wrappers
[
MNODE
];
}
else
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8477d22d
...
...
@@ -309,6 +309,7 @@ typedef struct {
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
dstTbUid
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
8477d22d
...
...
@@ -132,7 +132,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_
VND
_TASK_DEPLOY
,
pVgroup
->
vgId
);
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_
STREAM
_TASK_DEPLOY
,
pVgroup
->
vgId
);
return
0
;
}
...
...
@@ -156,7 +156,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_S
ND
_TASK_DEPLOY
,
0
);
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_S
TREAM
_TASK_DEPLOY
,
0
);
return
0
;
}
...
...
@@ -222,7 +222,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
// dispatch
pTask
->
dispatchType
=
TASK_DISPATCH__NONE
;
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_
VND
_TASK_DEPLOY
,
pVgroup
->
vgId
);
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_
STREAM
_TASK_DEPLOY
,
pVgroup
->
vgId
);
}
return
0
;
}
...
...
@@ -267,8 +267,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
// dispatch
pTask
->
dispatchType
=
TASK_DISPATCH__NONE
;
/*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_VND_TASK_DEPLOY
,
pStream
->
fixedSinkVg
.
vgId
);
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
pTask
->
epSet
,
TDMT_STREAM_TASK_DEPLOY
,
pStream
->
fixedSinkVg
.
vgId
);
return
0
;
}
...
...
@@ -361,7 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT
(
taosArrayGetSize
(
pArray
)
==
1
);
SStreamTask
*
lastLevelTask
=
taosArrayGetP
(
pArray
,
0
);
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask
->
dispatchMsgType
=
TDMT_
VND
_TASK_DISPATCH
;
pTask
->
dispatchMsgType
=
TDMT_
STREAM
_TASK_DISPATCH
;
pTask
->
dispatchType
=
TASK_DISPATCH__FIXED
;
pTask
->
fixedEpDispatcher
.
taskId
=
lastLevelTask
->
taskId
;
...
...
@@ -407,7 +406,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask
->
dispatchType
=
TASK_DISPATCH__SHUFFLE
;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask
->
dispatchMsgType
=
TDMT_
VND
_TASK_DISPATCH
;
pTask
->
dispatchMsgType
=
TDMT_
STREAM
_TASK_DISPATCH
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pStream
->
targetDb
);
ASSERT
(
pDb
);
if
(
mndExtractDbInfo
(
pMnode
,
pDb
,
&
pTask
->
shuffleDispatcher
.
dbInfo
,
NULL
)
<
0
)
{
...
...
@@ -438,7 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
else
{
pTask
->
dispatchType
=
TASK_DISPATCH__FIXED
;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask
->
dispatchMsgType
=
TDMT_
VND
_TASK_DISPATCH
;
pTask
->
dispatchMsgType
=
TDMT_
STREAM
_TASK_DISPATCH
;
SArray
*
pArray
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
// one sink only
ASSERT
(
taosArrayGetSize
(
pArray
)
==
1
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
8477d22d
...
...
@@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS
static
int32_t
mndProcessMCreateSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessMDropSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessGetSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessGetTbSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_INDEX
,
mndProcessGetSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_TABLE_INDEX
,
mndProcessGetTbSmaReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndCancelGetNextSma
);
...
...
@@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return
code
;
}
static
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
STableIndexReq
*
indexReq
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSmaObj
*
pSma
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
STableIndexInfo
info
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pIter
,
(
void
**
)
&
pSma
);
if
(
pIter
==
NULL
)
break
;
if
(
pSma
->
stb
[
0
]
!=
indexReq
->
tbFName
[
0
]
||
strcmp
(
pSma
->
stb
,
indexReq
->
tbFName
))
{
continue
;
}
info
.
intervalUnit
=
pSma
->
intervalUnit
;
info
.
slidingUnit
=
pSma
->
slidingUnit
;
info
.
interval
=
pSma
->
interval
;
info
.
offset
=
pSma
->
offset
;
info
.
sliding
=
pSma
->
sliding
;
info
.
dstTbUid
=
pSma
->
dstTbUid
;
info
.
dstVgId
=
pSma
->
dstVgId
;
info
.
expr
=
taosMemoryMalloc
(
pSma
->
exprLen
+
1
);
if
(
info
.
expr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
sdbRelease
(
pSdb
,
pSma
);
return
code
;
}
memcpy
(
info
.
expr
,
pSma
->
expr
,
pSma
->
exprLen
);
info
.
expr
[
pSma
->
exprLen
]
=
0
;
if
(
NULL
==
taosArrayPush
(
rsp
->
pIndex
,
&
info
))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
taosMemoryFree
(
info
.
expr
);
sdbRelease
(
pSdb
,
pSma
);
return
code
;
}
*
exist
=
true
;
sdbRelease
(
pSdb
,
pSma
);
}
return
code
;
}
static
int32_t
mndProcessGetSmaReq
(
SRpcMsg
*
pReq
)
{
SUserIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
info
.
node
;
...
...
@@ -916,6 +967,59 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessGetTbSmaReq
(
SRpcMsg
*
pReq
)
{
STableIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
STableIndexRsp
rsp
=
{
0
};
bool
exist
=
false
;
if
(
tDeserializeSTableIndexReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
indexReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
rsp
.
pIndex
=
taosArrayInit
(
10
,
sizeof
(
STableIndexInfo
));
if
(
NULL
==
rsp
.
pIndex
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
code
=
mndGetTableSma
(
pMnode
,
&
indexReq
,
&
rsp
,
&
exist
);
if
(
code
)
{
goto
_OVER
;
}
if
(
!
exist
)
{
code
=
-
1
;
terrno
=
TSDB_CODE_MND_DB_INDEX_NOT_EXIST
;
}
else
{
int32_t
contLen
=
tSerializeSTableIndexRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
tSerializeSTableIndexRsp
(
pRsp
,
contLen
,
&
rsp
);
pReq
->
info
.
rsp
=
pRsp
;
pReq
->
info
.
rspLen
=
contLen
;
code
=
0
;
}
_OVER:
if
(
code
!=
0
)
{
mError
(
"failed to get table index %s since %s"
,
indexReq
.
tbFName
,
terrstr
());
}
return
code
;
}
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
8477d22d
...
...
@@ -54,8 +54,8 @@ int32_t mndInitStream(SMnode *pMnode) {
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_STREAM
,
mndProcessCreateStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_
VND
_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_SND_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_
STREAM
_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
/*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
...
...
source/dnode/snode/src/snode.c
浏览文件 @
8477d22d
...
...
@@ -90,7 +90,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
// operator exec
if
(
pMsg
->
msgType
==
TDMT_S
ND
_TASK_DEPLOY
)
{
if
(
pMsg
->
msgType
==
TDMT_S
TREAM
_TASK_DEPLOY
)
{
void
*
msg
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
SStreamTask
*
pTask
=
taosMemoryMalloc
(
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
8477d22d
...
...
@@ -158,7 +158,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// TODO: handle error
}
break
;
case
TDMT_
VND
_TASK_DEPLOY
:
{
case
TDMT_
STREAM
_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
...
...
@@ -238,21 +238,19 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case
TDMT_VND_TABLE_META
:
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
,
pInfo
->
workerId
);
case
TDMT_VND_TASK_RUN
:
{
int32_t
code
=
tqProcessTaskRunReq
(
pVnode
->
pTq
,
pMsg
);
pMsg
->
pCont
=
NULL
;
return
code
;
}
case
TDMT_VND_TASK_DISPATCH
:
case
TDMT_STREAM_TASK_RUN
:
return
tqProcessTaskRunReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_DISPATCH
:
return
tqProcessTaskDispatchReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_
VND
_TASK_RECOVER
:
case
TDMT_
STREAM
_TASK_RECOVER
:
return
tqProcessTaskRecoverReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_
VND
_TASK_DISPATCH_RSP
:
case
TDMT_
STREAM
_TASK_DISPATCH_RSP
:
return
tqProcessTaskDispatchRsp
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_
VND
_TASK_RECOVER_RSP
:
case
TDMT_
STREAM
_TASK_RECOVER_RSP
:
return
tqProcessTaskRecoverRsp
(
pVnode
->
pTq
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
8477d22d
...
...
@@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation {
int32_t
opId
;
void
*
data
;
bool
syncOp
;
uint64_t
seqId
;
tsem_t
rspSem
;
}
SCtgCacheOperation
;
typedef
struct
SCtgQNode
{
SCtgCacheOperation
op
;
SCtgCacheOperation
*
op
;
struct
SCtgQNode
*
next
;
}
SCtgQNode
;
typedef
struct
SCtgQueue
{
SRWLatch
qlock
;
uint64_t
seqId
;
uint64_t
seqDone
;
SCtgQNode
*
head
;
SCtgQNode
*
tail
;
tsem_t
reqSem
;
tsem_t
rspSem
;
uint64_t
qRemainNum
;
}
SCtgQueue
;
...
...
@@ -493,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t
ctgGetQnodeListFromMnode
(
CTG_PARAMS
,
SArray
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetDBCfgFromMnode
(
CTG_PARAMS
,
const
char
*
dbFName
,
SDbCfgInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetIndexInfoFromMnode
(
CTG_PARAMS
,
const
char
*
indexName
,
SIndexInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbIndexFromMnode
(
CTG_PARAMS
,
const
char
*
tbFName
,
SArray
**
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUdfInfoFromMnode
(
CTG_PARAMS
,
const
char
*
funcName
,
SFuncInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUserDbAuthFromMnode
(
CTG_PARAMS
,
const
char
*
user
,
SGetUserAuthRsp
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbMetaFromMnodeImpl
(
CTG_PARAMS
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
out
,
SCtgTask
*
pTask
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
8477d22d
...
...
@@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET
(
TSDB_CODE_CTG_SYS_ERROR
);
}
if
(
tsem_init
(
&
gCtgMgmt
.
queue
.
rspSem
,
0
,
0
))
{
qError
(
"tsem_init failed, error:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
CTG_ERR_RET
(
TSDB_CODE_CTG_SYS_ERROR
);
}
gCtgMgmt
.
queue
.
head
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgQNode
));
if
(
NULL
==
gCtgMgmt
.
queue
.
head
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgQNode
));
...
...
@@ -1141,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE
(
ctgGetIndexInfoFromMnode
(
CTG_PARAMS_LIST
(),
indexName
,
pInfo
,
NULL
));
}
int32_t
catalogGetTableIndex
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
tbFName
,
SArray
**
pRes
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
tbFName
||
NULL
==
pRes
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgGetTbIndexFromMnode
(
CTG_PARAMS_LIST
(),
tbFName
,
pRes
,
NULL
));
}
int32_t
catalogGetUdfInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
funcName
,
SFuncInfo
*
pInfo
)
{
CTG_API_ENTER
();
...
...
@@ -1195,10 +1201,6 @@ void catalogDestroy(void) {
qError
(
"tsem_post failed, error:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
if
(
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
))
{
qError
(
"tsem_post failed, error:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
while
(
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
))
{
taosUsleep
(
1
);
}
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
8477d22d
...
...
@@ -501,25 +501,6 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
void
ctgWaitOpDone
(
SCtgCacheOperation
*
action
)
{
while
(
true
)
{
tsem_wait
(
&
gCtgMgmt
.
queue
.
rspSem
);
if
(
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
))
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
break
;
}
if
(
gCtgMgmt
.
queue
.
seqDone
>=
action
->
seqId
)
{
break
;
}
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
sched_yield
();
}
}
void
ctgDequeue
(
SCtgCacheOperation
**
op
)
{
SCtgQNode
*
orig
=
gCtgMgmt
.
queue
.
head
;
...
...
@@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
taosMemoryFreeClear
(
orig
);
*
op
=
&
node
->
op
;
*
op
=
node
->
op
;
}
...
...
@@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
operation
->
seqId
=
atomic_add_fetch_64
(
&
gCtgMgmt
.
queue
.
seqId
,
1
);
if
(
operation
->
syncOp
)
{
tsem_init
(
&
operation
->
rspSem
,
0
,
0
);
}
node
->
op
=
*
operation
;
node
->
op
=
operation
;
CTG_LOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
gCtgMgmt
.
queue
.
tail
->
next
=
node
;
...
...
@@ -558,7 +541,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
ctgDebug
(
"action [%s] added into queue"
,
gCtgCacheOperation
[
operation
->
opId
].
name
);
if
(
operation
->
syncOp
)
{
ctgWaitOpDone
(
operation
);
tsem_wait
(
&
operation
->
rspSem
);
taosMemoryFree
(
operation
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -567,7 +551,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t
ctgDropDbCacheEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_DROP_DB_CACHE
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_DROP_DB_CACHE
;
SCtgDropDBMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgDropDBMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgDropDBMsg
));
...
...
@@ -583,21 +569,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
dbId
=
dbId
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
action
.
data
);
taosMemoryFreeClear
(
op
->
data
);
CTG_RET
(
code
);
}
int32_t
ctgDropDbVgroupEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_DROP_DB_VGROUP
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_DROP_DB_VGROUP
;
op
->
syncOp
=
syncOp
;
SCtgDropDbVgroupMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgDropDbVgroupMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgDropDbVgroupMsg
));
...
...
@@ -612,15 +601,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
action
.
data
);
taosMemoryFreeClear
(
op
->
data
);
CTG_RET
(
code
);
}
...
...
@@ -628,7 +617,10 @@ _return:
int32_t
ctgDropStbMetaEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
const
char
*
stbName
,
uint64_t
suid
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_DROP_STB_META
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_DROP_STB_META
;
op
->
syncOp
=
syncOp
;
SCtgDropStbMetaMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgDropStbMetaMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgDropStbMetaMsg
));
...
...
@@ -641,15 +633,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
msg
->
dbId
=
dbId
;
msg
->
suid
=
suid
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
action
.
data
);
taosMemoryFreeClear
(
op
->
data
);
CTG_RET
(
code
);
}
...
...
@@ -657,7 +649,10 @@ _return:
int32_t
ctgDropTbMetaEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
const
char
*
tbName
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_DROP_TB_META
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_DROP_TB_META
;
op
->
syncOp
=
syncOp
;
SCtgDropTblMetaMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgDropTblMetaMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgDropTblMetaMsg
));
...
...
@@ -669,21 +664,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
strncpy
(
msg
->
tbName
,
tbName
,
sizeof
(
msg
->
tbName
));
msg
->
dbId
=
dbId
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
action
.
data
);
taosMemoryFreeClear
(
op
->
data
);
CTG_RET
(
code
);
}
int32_t
ctgUpdateVgroupEnqueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
SDBVgInfo
*
dbInfo
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_UPDATE_VGROUP
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_UPDATE_VGROUP
;
op
->
syncOp
=
syncOp
;
SCtgUpdateVgMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgUpdateVgMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateVgMsg
));
...
...
@@ -701,22 +699,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
msg
->
dbId
=
dbId
;
msg
->
dbInfo
=
dbInfo
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
ctgFreeVgInfo
(
dbInfo
);
taosMemoryFreeClear
(
action
.
data
);
taosMemoryFreeClear
(
op
->
data
);
CTG_RET
(
code
);
}
int32_t
ctgUpdateTbMetaEnqueue
(
SCatalog
*
pCtg
,
STableMetaOutput
*
output
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_UPDATE_TB_META
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_UPDATE_TB_META
;
op
->
syncOp
=
syncOp
;
SCtgUpdateTblMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgUpdateTblMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateTblMsg
));
...
...
@@ -731,9 +732,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
msg
->
pCtg
=
pCtg
;
msg
->
output
=
output
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -746,7 +747,9 @@ _return:
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
)
{
int32_t
code
=
0
;
SCtgCacheOperation
operation
=
{.
opId
=
CTG_OP_UPDATE_VG_EPSET
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_UPDATE_VG_EPSET
;
SCtgUpdateEpsetMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgUpdateEpsetMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateEpsetMsg
));
...
...
@@ -758,9 +761,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
msg
->
vgId
=
vgId
;
msg
->
epSet
=
*
pEpSet
;
op
eration
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
operation
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -775,7 +778,10 @@ _return:
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
action
=
{.
opId
=
CTG_OP_UPDATE_USER
,
.
syncOp
=
syncOp
};
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_UPDATE_USER
;
op
->
syncOp
=
syncOp
;
SCtgUpdateUserMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgUpdateUserMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateUserMsg
));
...
...
@@ -785,9 +791,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
msg
->
pCtg
=
pCtg
;
msg
->
userAuth
=
*
pAuth
;
action
.
data
=
msg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
&
action
));
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1607,6 +1613,39 @@ void ctgUpdateThreadUnexpectedStopped(void) {
if
(
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
)
>
0
)
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
}
void
ctgCleanupCacheQueue
(
void
)
{
SCtgQNode
*
node
=
NULL
;
SCtgQNode
*
nodeNext
=
NULL
;
while
(
true
)
{
node
=
gCtgMgmt
.
queue
.
head
->
next
;
while
(
node
)
{
if
(
node
->
op
)
{
taosMemoryFree
(
node
->
op
->
data
);
if
(
node
->
op
->
syncOp
)
{
tsem_post
(
&
node
->
op
->
rspSem
);
}
else
{
taosMemoryFree
(
node
->
op
);
}
}
nodeNext
=
node
->
next
;
taosMemoryFree
(
node
);
node
=
nodeNext
;
}
if
(
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
))
{
taosUsleep
(
1
);
}
else
{
break
;
}
}
taosMemoryFreeClear
(
gCtgMgmt
.
queue
.
head
);
gCtgMgmt
.
queue
.
tail
=
NULL
;
}
void
*
ctgUpdateThreadFunc
(
void
*
param
)
{
setThreadName
(
"catalog"
);
#ifdef WINDOWS
...
...
@@ -1622,7 +1661,8 @@ void* ctgUpdateThreadFunc(void* param) {
}
if
(
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
))
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
ctgCleanupCacheQueue
();
break
;
}
...
...
@@ -1634,10 +1674,8 @@ void* ctgUpdateThreadFunc(void* param) {
(
*
gCtgCacheOperation
[
operation
->
opId
].
func
)(
operation
);
gCtgMgmt
.
queue
.
seqDone
=
operation
->
seqId
;
if
(
operation
->
syncOp
)
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
tsem_post
(
&
operation
->
rspSem
);
}
CTG_RT_STAT_INC
(
qDoneNum
,
1
);
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
8477d22d
...
...
@@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug
(
"Got index from mnode, indexName:%s"
,
target
);
break
;
}
case
TDMT_MND_GET_TABLE_INDEX
:
{
if
(
TSDB_CODE_SUCCESS
!=
rspCode
)
{
qError
(
"error rsp for get table index, error:%s, tbFName:%s"
,
tstrerror
(
rspCode
),
target
);
CTG_ERR_RET
(
rspCode
);
}
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
reqType
)](
out
,
msg
,
msgSize
);
if
(
code
)
{
qError
(
"Process get table index rsp failed, error:%s, tbFName:%s"
,
tstrerror
(
code
),
target
);
CTG_ERR_RET
(
code
);
}
qDebug
(
"Got table index from mnode, tbFName:%s"
,
target
);
break
;
}
case
TDMT_MND_RETRIEVE_FUNC
:
{
if
(
TSDB_CODE_SUCCESS
!=
rspCode
)
{
qError
(
"error rsp for get udf, error:%s, funcName:%s"
,
tstrerror
(
rspCode
),
target
);
...
...
@@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTbIndexFromMnode
(
CTG_PARAMS
,
const
char
*
tbFName
,
SArray
**
out
,
SCtgTask
*
pTask
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_GET_TABLE_INDEX
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get tb index from mnode, tbFName:%s"
,
tbFName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
tbFName
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get index msg failed, code:%s, tbFName:%s"
,
tstrerror
(
code
),
tbFName
);
CTG_ERR_RET
(
code
);
}
if
(
pTask
)
{
void
*
pOut
=
taosMemoryCalloc
(
1
,
POINTER_BYTES
);
if
(
NULL
==
pOut
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
tbFName
));
CTG_RET
(
ctgAsyncSendMsg
(
CTG_PARAMS_LIST
(),
pTask
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
reqType
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pTrans
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
CTG_ERR_RET
(
ctgProcessRspMsg
(
out
,
reqType
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
,
rpcRsp
.
code
,
(
char
*
)
tbFName
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetUdfInfoFromMnode
(
CTG_PARAMS
,
const
char
*
funcName
,
SFuncInfo
*
out
,
SCtgTask
*
pTask
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
8477d22d
...
...
@@ -449,6 +449,34 @@ static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
return
(
SNode
*
)
pDst
;
}
static
SNode
*
physiNodeCopy
(
const
SPhysiNode
*
pSrc
,
SPhysiNode
*
pDst
)
{
CLONE_NODE_FIELD
(
pOutputDataBlockDesc
);
CLONE_NODE_FIELD
(
pConditions
);
CLONE_NODE_LIST_FIELD
(
pChildren
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
physiWindowCopy
(
const
SWinodwPhysiNode
*
pSrc
,
SWinodwPhysiNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
physiNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pExprs
);
CLONE_NODE_LIST_FIELD
(
pFuncs
);
CLONE_NODE_FIELD
(
pTspk
);
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
filesFactor
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
physiIntervalCopy
(
const
SIntervalPhysiNode
*
pSrc
,
SIntervalPhysiNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
window
,
physiWindowCopy
);
COPY_SCALAR_FIELD
(
interval
);
COPY_SCALAR_FIELD
(
offset
);
COPY_SCALAR_FIELD
(
sliding
);
COPY_SCALAR_FIELD
(
intervalUnit
);
COPY_SCALAR_FIELD
(
slidingUnit
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
dataBlockDescCopy
(
const
SDataBlockDescNode
*
pSrc
,
SDataBlockDescNode
*
pDst
)
{
COPY_SCALAR_FIELD
(
dataBlockId
);
CLONE_NODE_LIST_FIELD
(
pSlots
);
...
...
@@ -575,6 +603,12 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return
logicIndefRowsFuncCopy
((
const
SIndefRowsFuncLogicNode
*
)
pNode
,
(
SIndefRowsFuncLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
logicSubplanCopy
((
const
SLogicSubplan
*
)
pNode
,
(
SLogicSubplan
*
)
pDst
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
physiIntervalCopy
((
const
SIntervalPhysiNode
*
)
pNode
,
(
SIntervalPhysiNode
*
)
pDst
);
default:
break
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
8477d22d
...
...
@@ -164,9 +164,14 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SCreateStreamStmt
));
case
QUERY_NODE_DROP_STREAM_STMT
:
return
makeNode
(
type
,
sizeof
(
SDropStreamStmt
));
case
QUERY_NODE_BALANCE_VGROUP_STMT
:
return
makeNode
(
type
,
sizeof
(
SBalanceVgroupStmt
));
case
QUERY_NODE_MERGE_VGROUP_STMT
:
return
makeNode
(
type
,
sizeof
(
SMergeVgroupStmt
));
case
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
:
return
makeNode
(
type
,
sizeof
(
SRedistributeVgroupStmt
));
case
QUERY_NODE_SPLIT_VGROUP_STMT
:
return
makeNode
(
type
,
sizeof
(
SSplitVgroupStmt
));
case
QUERY_NODE_SYNCDB_STMT
:
break
;
case
QUERY_NODE_GRANT_STMT
:
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
8477d22d
...
...
@@ -187,6 +187,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const
SNode
*
pOptions
,
SNode
*
pQuery
);
SNode
*
createDropStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pStreamName
);
SNode
*
createKillStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pId
);
SNode
*
createBalanceVgroupStmt
(
SAstCreateContext
*
pCxt
);
SNode
*
createMergeVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId1
,
const
SToken
*
pVgId2
);
SNode
*
createRedistributeVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId
,
SNodeList
*
pDnodes
);
SNode
*
createSplitVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
8477d22d
...
...
@@ -469,9 +469,10 @@ cmd ::= KILL QUERY NK_INTEGER(A).
cmd ::= KILL TRANSACTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_TRANSACTION_STMT, &A); }
/************************************************ merge/redistribute/ vgroup ******************************************/
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
//
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
%type dnode_list { SNodeList* }
%destructor dnode_list { nodesDestroyList($$); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
8477d22d
...
...
@@ -1435,25 +1435,37 @@ SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId
return
(
SNode
*
)
pStmt
;
}
SNode
*
createBalanceVgroupStmt
(
SAstCreateContext
*
pCxt
)
{
CHECK_PARSER_STATUS
(
pCxt
);
SBalanceVgroupStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_BALANCE_VGROUP_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
return
(
SNode
*
)
pStmt
;
}
SNode
*
createMergeVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId1
,
const
SToken
*
pVgId2
)
{
CHECK_PARSER_STATUS
(
pCxt
);
S
Node
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_MERGE_VGROUP_STMT
);
S
MergeVgroupStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_MERGE_VGROUP_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
return
pStmt
;
pStmt
->
vgId1
=
taosStr2Int32
(
pVgId1
->
z
,
NULL
,
10
);
pStmt
->
vgId2
=
taosStr2Int32
(
pVgId2
->
z
,
NULL
,
10
);
return
(
SNode
*
)
pStmt
;
}
SNode
*
createRedistributeVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId
,
SNodeList
*
pDnodes
)
{
CHECK_PARSER_STATUS
(
pCxt
);
S
Node
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
);
S
RedistributeVgroupStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
return
pStmt
;
pStmt
->
vgId
=
taosStr2Int32
(
pVgId
->
z
,
NULL
,
10
);
pStmt
->
pDnodes
=
pDnodes
;
return
(
SNode
*
)
pStmt
;
}
SNode
*
createSplitVgroupStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pVgId
)
{
CHECK_PARSER_STATUS
(
pCxt
);
S
Node
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_SPLIT_VGROUP_STMT
);
S
SplitVgroupStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_SPLIT_VGROUP_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
return
pStmt
;
pStmt
->
vgId
=
taosStr2Int32
(
pVgId
->
z
,
NULL
,
10
);
return
(
SNode
*
)
pStmt
;
}
SNode
*
createSyncdbStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDbName
)
{
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
8477d22d
...
...
@@ -41,6 +41,7 @@ static SKeyword keywordTable[] = {
{
"AS"
,
TK_AS
},
{
"ASC"
,
TK_ASC
},
{
"AT_ONCE"
,
TK_AT_ONCE
},
{
"BALANCE"
,
TK_BALANCE
},
{
"BETWEEN"
,
TK_BETWEEN
},
{
"BINARY"
,
TK_BINARY
},
{
"BIGINT"
,
TK_BIGINT
},
...
...
@@ -116,6 +117,7 @@ static SKeyword keywordTable[] = {
{
"LOCAL"
,
TK_LOCAL
},
{
"MATCH"
,
TK_MATCH
},
{
"MAXROWS"
,
TK_MAXROWS
},
{
"MERGE"
,
TK_MERGE
},
{
"MINROWS"
,
TK_MINROWS
},
{
"MINUS"
,
TK_MINUS
},
{
"MNODE"
,
TK_MNODE
},
...
...
@@ -151,6 +153,7 @@ static SKeyword keywordTable[] = {
{
"QUERY"
,
TK_QUERY
},
{
"RATIO"
,
TK_RATIO
},
{
"READ"
,
TK_READ
},
{
"REDISTRIBUTE"
,
TK_REDISTRIBUTE
},
{
"RENAME"
,
TK_RENAME
},
{
"REPLICA"
,
TK_REPLICA
},
{
"RESET"
,
TK_RESET
},
...
...
@@ -171,6 +174,7 @@ static SKeyword keywordTable[] = {
{
"SNODE"
,
TK_SNODE
},
{
"SNODES"
,
TK_SNODES
},
{
"SOFFSET"
,
TK_SOFFSET
},
// {"SPLIT", TK_SPLIT},
{
"STABLE"
,
TK_STABLE
},
{
"STABLES"
,
TK_STABLES
},
{
"STATE"
,
TK_STATE
},
...
...
@@ -208,6 +212,7 @@ static SKeyword keywordTable[] = {
{
"VARCHAR"
,
TK_VARCHAR
},
{
"VARIABLES"
,
TK_VARIABLES
},
{
"VERBOSE"
,
TK_VERBOSE
},
{
"VGROUP"
,
TK_VGROUP
},
{
"VGROUPS"
,
TK_VGROUPS
},
{
"VNODES"
,
TK_VNODES
},
{
"WAL"
,
TK_WAL
},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
8477d22d
...
...
@@ -847,9 +847,9 @@ static EDealRes translateJsonOperator(STranslateContext* pCxt, SOperatorNode* pO
if
(
TSDB_DATA_TYPE_JSON
!=
ldt
.
type
||
TSDB_DATA_TYPE_BINARY
!=
rdt
.
type
)
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
}
if
(
pOp
->
opType
==
OP_TYPE_JSON_GET_VALUE
)
{
if
(
pOp
->
opType
==
OP_TYPE_JSON_GET_VALUE
)
{
pOp
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_JSON
;
}
else
if
(
pOp
->
opType
==
OP_TYPE_JSON_CONTAINS
)
{
}
else
if
(
pOp
->
opType
==
OP_TYPE_JSON_CONTAINS
)
{
pOp
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BOOL
;
}
pOp
->
node
.
resType
.
bytes
=
tDataTypes
[
pOp
->
node
.
resType
.
type
].
bytes
;
...
...
@@ -3611,6 +3611,55 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
return
buildCmdMsg
(
pCxt
,
TDMT_MND_ALTER_USER
,
(
FSerializeFunc
)
tSerializeSAlterUserReq
,
&
req
);
}
static
int32_t
translateBalanceVgroup
(
STranslateContext
*
pCxt
,
SBalanceVgroupStmt
*
pStmt
)
{
SBalanceVgroupReq
req
=
{
0
};
return
buildCmdMsg
(
pCxt
,
TDMT_MND_BALANCE_VGROUP
,
(
FSerializeFunc
)
tSerializeSBalanceVgroupReq
,
&
req
);
}
static
int32_t
translateMergeVgroup
(
STranslateContext
*
pCxt
,
SMergeVgroupStmt
*
pStmt
)
{
SMergeVgroupReq
req
=
{.
vgId1
=
pStmt
->
vgId1
,
.
vgId2
=
pStmt
->
vgId2
};
return
buildCmdMsg
(
pCxt
,
TDMT_MND_MERGE_VGROUP
,
(
FSerializeFunc
)
tSerializeSMergeVgroupReq
,
&
req
);
}
static
int32_t
checkDnodeIds
(
STranslateContext
*
pCxt
,
SRedistributeVgroupStmt
*
pStmt
)
{
int32_t
numOfDnodes
=
LIST_LENGTH
(
pStmt
->
pDnodes
);
if
(
numOfDnodes
>
3
||
numOfDnodes
<
1
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG
);
}
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pStmt
->
pDnodes
)
{
SValueNode
*
pVal
=
(
SValueNode
*
)
pNode
;
if
(
DEAL_RES_ERROR
==
translateValue
(
pCxt
,
pVal
))
{
return
pCxt
->
errCode
;
}
}
pStmt
->
dnodeId1
=
getBigintFromValueNode
((
SValueNode
*
)
nodesListGetNode
(
pStmt
->
pDnodes
,
0
));
pStmt
->
dnodeId2
=
-
1
;
pStmt
->
dnodeId3
=
-
1
;
if
(
numOfDnodes
>
1
)
{
pStmt
->
dnodeId2
=
getBigintFromValueNode
((
SValueNode
*
)
nodesListGetNode
(
pStmt
->
pDnodes
,
1
));
}
if
(
numOfDnodes
>
2
)
{
pStmt
->
dnodeId3
=
getBigintFromValueNode
((
SValueNode
*
)
nodesListGetNode
(
pStmt
->
pDnodes
,
2
));
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateRedistributeVgroup
(
STranslateContext
*
pCxt
,
SRedistributeVgroupStmt
*
pStmt
)
{
SRedistributeVgroupReq
req
=
{.
vgId
=
pStmt
->
vgId
};
int32_t
code
=
checkDnodeIds
(
pCxt
,
pStmt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
req
.
dnodeId1
=
pStmt
->
dnodeId1
;
req
.
dnodeId2
=
pStmt
->
dnodeId2
;
req
.
dnodeId3
=
pStmt
->
dnodeId3
;
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_REDISTRIBUTE_VGROUP
,
(
FSerializeFunc
)
tSerializeSRedistributeVgroupReq
,
&
req
);
}
return
code
;
}
static
int32_t
translateQuery
(
STranslateContext
*
pCxt
,
SNode
*
pNode
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
pNode
))
{
...
...
@@ -3733,6 +3782,15 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case
QUERY_NODE_REVOKE_STMT
:
code
=
translateRevoke
(
pCxt
,
(
SRevokeStmt
*
)
pNode
);
break
;
case
QUERY_NODE_BALANCE_VGROUP_STMT
:
code
=
translateBalanceVgroup
(
pCxt
,
(
SBalanceVgroupStmt
*
)
pNode
);
break
;
case
QUERY_NODE_MERGE_VGROUP_STMT
:
code
=
translateMergeVgroup
(
pCxt
,
(
SMergeVgroupStmt
*
)
pNode
);
break
;
case
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
:
code
=
translateRedistributeVgroup
(
pCxt
,
(
SRedistributeVgroupStmt
*
)
pNode
);
break
;
default:
break
;
}
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
8477d22d
...
...
@@ -180,6 +180,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return
"Value too long for column/tag: %s"
;
case
TSDB_CODE_PAR_INVALID_DELETE_WHERE
:
return
"The DELETE statement must have a definite time window range"
;
case
TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG
:
return
"The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes"
;
case
TSDB_CODE_OUT_OF_MEMORY
:
return
"Out of memory"
;
default:
...
...
source/libs/parser/src/sql.c
浏览文件 @
8477d22d
此差异已折叠。
点击以展开。
source/libs/parser/test/parExplainToSyncdbTest.cpp
浏览文件 @
8477d22d
...
...
@@ -19,7 +19,7 @@ using namespace std;
namespace
ParserTest
{
class
ParserExplainToSyncdbTest
:
public
Parser
TestBase
{};
class
ParserExplainToSyncdbTest
:
public
Parser
DdlTest
{};
TEST_F
(
ParserExplainToSyncdbTest
,
explain
)
{
useDb
(
"root"
,
"test"
);
...
...
@@ -43,8 +43,62 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
// todo kill connection
// todo kill query
// todo kill stream
// todo merge vgroup
// todo redistribute vgroup
TEST_F
(
ParserExplainToSyncdbTest
,
mergeVgroup
)
{
useDb
(
"root"
,
"test"
);
SMergeVgroupReq
expect
=
{
0
};
auto
setMergeVgroupReqFunc
=
[
&
](
int32_t
vgId1
,
int32_t
vgId2
)
{
expect
.
vgId1
=
vgId1
;
expect
.
vgId2
=
vgId2
;
};
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
ASSERT_EQ
(
nodeType
(
pQuery
->
pRoot
),
QUERY_NODE_MERGE_VGROUP_STMT
);
ASSERT_EQ
(
pQuery
->
pCmdMsg
->
msgType
,
TDMT_MND_MERGE_VGROUP
);
SMergeVgroupReq
req
=
{
0
};
ASSERT_EQ
(
tDeserializeSMergeVgroupReq
(
pQuery
->
pCmdMsg
->
pMsg
,
pQuery
->
pCmdMsg
->
msgLen
,
&
req
),
TSDB_CODE_SUCCESS
);
ASSERT_EQ
(
req
.
vgId1
,
expect
.
vgId1
);
ASSERT_EQ
(
req
.
vgId2
,
expect
.
vgId2
);
});
setMergeVgroupReqFunc
(
1
,
2
);
run
(
"MERGE VGROUP 1 2"
);
}
TEST_F
(
ParserExplainToSyncdbTest
,
redistributeVgroup
)
{
useDb
(
"root"
,
"test"
);
SRedistributeVgroupReq
expect
=
{
0
};
auto
setRedistributeVgroupReqFunc
=
[
&
](
int32_t
vgId
,
int32_t
dnodeId1
,
int32_t
dnodeId2
=
-
1
,
int32_t
dnodeId3
=
-
1
)
{
expect
.
vgId
=
vgId
;
expect
.
dnodeId1
=
dnodeId1
;
expect
.
dnodeId2
=
dnodeId2
;
expect
.
dnodeId3
=
dnodeId3
;
};
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
ASSERT_EQ
(
nodeType
(
pQuery
->
pRoot
),
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT
);
ASSERT_EQ
(
pQuery
->
pCmdMsg
->
msgType
,
TDMT_MND_REDISTRIBUTE_VGROUP
);
SRedistributeVgroupReq
req
=
{
0
};
ASSERT_EQ
(
tDeserializeSRedistributeVgroupReq
(
pQuery
->
pCmdMsg
->
pMsg
,
pQuery
->
pCmdMsg
->
msgLen
,
&
req
),
TSDB_CODE_SUCCESS
);
ASSERT_EQ
(
req
.
vgId
,
expect
.
vgId
);
ASSERT_EQ
(
req
.
dnodeId1
,
expect
.
dnodeId1
);
ASSERT_EQ
(
req
.
dnodeId2
,
expect
.
dnodeId2
);
ASSERT_EQ
(
req
.
dnodeId3
,
expect
.
dnodeId3
);
});
setRedistributeVgroupReqFunc
(
3
,
1
);
run
(
"REDISTRIBUTE VGROUP 3 DNODE 1"
);
setRedistributeVgroupReqFunc
(
5
,
10
,
20
,
30
);
run
(
"REDISTRIBUTE VGROUP 5 DNODE 10 DNODE 20 DNODE 30"
);
}
// todo reset query cache
TEST_F
(
ParserExplainToSyncdbTest
,
revoke
)
{
...
...
source/libs/parser/test/parInitialATest.cpp
浏览文件 @
8477d22d
...
...
@@ -305,6 +305,19 @@ TEST_F(ParserInitialATest, alterUser) {
run
(
"ALTER user wxy privilege 'write'"
);
}
TEST_F
(
ParserInitialATest
,
balanceVgroup
)
{
useDb
(
"root"
,
"test"
);
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
ASSERT_EQ
(
nodeType
(
pQuery
->
pRoot
),
QUERY_NODE_BALANCE_VGROUP_STMT
);
ASSERT_EQ
(
pQuery
->
pCmdMsg
->
msgType
,
TDMT_MND_BALANCE_VGROUP
);
SBalanceVgroupReq
req
=
{
0
};
ASSERT_EQ
(
tDeserializeSBalanceVgroupReq
(
pQuery
->
pCmdMsg
->
pMsg
,
pQuery
->
pCmdMsg
->
msgLen
,
&
req
),
TSDB_CODE_SUCCESS
);
});
run
(
"BALANCE VGROUP"
);
}
TEST_F
(
ParserInitialATest
,
bug001
)
{
useDb
(
"root"
,
"test"
);
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
8477d22d
...
...
@@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetTbIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
STableIndexReq
indexReq
=
{
0
};
strcpy
(
indexReq
.
tbFName
,
input
);
int32_t
bufLen
=
tSerializeSTableIndexReq
(
NULL
,
0
,
&
indexReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
tSerializeSTableIndexReq
(
pBuf
,
bufLen
,
&
indexReq
);
*
msg
=
pBuf
;
*
msgLen
=
bufLen
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SUseDbOutput
*
pOut
=
output
;
...
...
@@ -459,6 +477,22 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessGetTbIndexRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
STableIndexRsp
out
=
{
0
};
if
(
tDeserializeSTableIndexRsp
(
msg
,
msgSize
,
&
out
)
!=
0
)
{
qError
(
"tDeserializeSTableIndexRsp failed, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_INVALID_MSG
;
}
*
(
void
**
)
output
=
out
.
pIndex
;
return
TSDB_CODE_SUCCESS
;
}
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
...
...
@@ -469,7 +503,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryBuildGetIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryBuildRetrieveFuncMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryBuildGetUserAuthMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryBuildGetTbIndexMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
...
...
@@ -479,6 +513,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryProcessGetIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryProcessRetrieveFuncRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryProcessGetUserAuthRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryProcessGetTbIndexRsp
;
}
#pragma GCC diagnostic pop
source/libs/qworker/inc/qwInt.h
浏览文件 @
8477d22d
...
...
@@ -356,7 +356,7 @@ int32_t qwOpenRef(void);
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
);
int32_t
qwUpdateTimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
);
int64_t
qwGetTimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
);
void
qwClearExpiredSch
(
SArray
*
pExpiredSch
);
void
qwClearExpiredSch
(
S
QWorker
*
mgmt
,
S
Array
*
pExpiredSch
);
int32_t
qwAcquireScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
);
void
qwFreeTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
);
...
...
source/libs/qworker/src/qwUtil.c
浏览文件 @
8477d22d
...
...
@@ -539,8 +539,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
}
void
qwClearExpiredSch
(
SArray
*
pExpiredSch
)
{
void
qwClearExpiredSch
(
SQWorker
*
mgmt
,
SArray
*
pExpiredSch
)
{
int32_t
num
=
taosArrayGetSize
(
pExpiredSch
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
uint64_t
*
sId
=
taosArrayGet
(
pExpiredSch
,
i
);
SQWSchStatus
*
pSch
=
NULL
;
if
(
qwAcquireScheduler
(
mgmt
,
*
sId
,
QW_WRITE
,
&
pSch
))
{
continue
;
}
if
(
taosHashGetSize
(
pSch
->
tasksHash
)
<=
0
)
{
qwDestroySchStatus
(
pSch
);
taosHashRemove
(
mgmt
->
schHash
,
sId
,
sizeof
(
*
sId
));
qError
(
"sch %"
PRIx64
"destroyed"
,
*
sId
);
}
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
}
}
source/libs/qworker/src/qworker.c
浏览文件 @
8477d22d
...
...
@@ -790,9 +790,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
}
QW_ERR_JRET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
QW_ERR_JRET
(
qwRegisterHbBrokenLinkArg
(
mgmt
,
req
->
sId
,
&
qwMsg
->
connInfo
));
sch
->
hbBrokenTs
=
0
;
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
if
(
sch
->
hbConnInfo
.
handle
)
{
...
...
@@ -912,7 +913,7 @@ _return:
}
if
(
taosArrayGetSize
(
pExpiredSch
)
>
0
)
{
qwClearExpiredSch
(
pExpiredSch
);
qwClearExpiredSch
(
mgmt
,
pExpiredSch
);
}
taosMemoryFreeClear
(
rspList
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
8477d22d
...
...
@@ -207,6 +207,7 @@ typedef struct SSchJob {
SArray
*
dataSrcTasks
;
// SArray<SQueryTask*>
int32_t
levelIdx
;
SEpSet
dataSrcEps
;
SHashObj
*
taskList
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
8477d22d
...
...
@@ -64,6 +64,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob
->
nodeList
=
taosArrayDup
(
pNodeList
);
}
pJob
->
taskList
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
taskList
)
{
SCH_JOB_ELOG
(
"taosHashInit %d taskList failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
pJob
));
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
...
...
@@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE
(
pJob
,
plan
->
subplanType
);
SSchTask
task
=
{
0
};
SSchTask
*
pTask
=
&
task
;
SCH_ERR_JRET
(
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
));
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
)
{
SSchTask
*
pTask
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
Task
)
{
SCH_TASK_ELOG
(
"taosArrayPush task to level failed, level:%d, taskIdx:%d"
,
pLevel
->
level
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schRecordQueryDataSrc
(
pJob
,
p
));
SCH_ERR_JRET
(
schRecordQueryDataSrc
(
pJob
,
p
Task
));
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
Task
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
0
!=
taosHashPut
(
pJob
->
taskList
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to taskList failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
pJob
->
taskNum
;
}
...
...
@@ -1276,15 +1286,11 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
}
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
)
{
schGetTaskFromList
(
pJob
->
execTasks
,
taskId
,
pTask
);
schGetTaskFromList
(
pJob
->
taskList
,
taskId
,
pTask
);
if
(
NULL
==
*
pTask
)
{
schGetTaskFromList
(
pJob
->
succTasks
,
taskId
,
pTask
);
if
(
NULL
==
*
pTask
)
{
SCH_JOB_ELOG
(
"task not found in execList & succList, taskId:%"
PRIx64
,
taskId
);
SCH_JOB_ELOG
(
"task not found in job task list, taskId:%"
PRIx64
,
taskId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1445,6 +1451,7 @@ void schFreeJobImpl(void *job) {
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosHashCleanup
(
pJob
->
taskList
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
8477d22d
...
...
@@ -26,7 +26,7 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
pRunReq
->
streamId
=
pTask
->
streamId
;
pRunReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_
VND
_TASK_RUN
,
.
msgType
=
TDMT_
STREAM
_TASK_RUN
,
.
pCont
=
pRunReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
...
...
source/libs/stream/src/streamMsg.c
浏览文件 @
8477d22d
...
...
@@ -190,9 +190,9 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_
VND_TASK_DISPATCH
||
pTask
->
dispatchMsgType
==
TDMT_SND
_TASK_DISPATCH
)
{
if
(
pTask
->
dispatchMsgType
==
TDMT_
STREAM
_TASK_DISPATCH
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_
TASK
_DISPATCH_WRITE
)
{
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_
STREAM
_DISPATCH_WRITE
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
...
...
source/util/src/terror.c
浏览文件 @
8477d22d
...
...
@@ -451,9 +451,89 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QW_MSG_ERROR
,
"Invalid msg order"
)
// parser
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_TABLE_NOT_EXIST
,
"Table does not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_PERMISSION_DENIED
,
"Permission denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTERNAL_ERROR
,
"Parser internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_SYNTAX_ERROR
,
"syntax error near"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INCOMPLETE_SQL
,
"Incomplete SQL statement"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_COLUMN
,
"Invalid column name"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_TABLE_NOT_EXIST
,
"Table does not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_AMBIGUOUS_COLUMN
,
"Column ambiguously defined"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
"Invalid value type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
,
"There mustn't be aggregation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT
,
"ORDER BY item must be the number of a SELECT-list expression"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION
,
"Not a GROUP BY expression"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION
,
"Not SELECTed expression"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_NOT_SINGLE_GROUP
,
"Not a single-group group function"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_TAGS_NOT_MATCHED
,
"Tags number not matched"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TAG_NAME
,
"Invalid tag name"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG
,
"Name or password too long"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_PASSWD_EMPTY
,
"Password can not be empty"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_PORT
,
"Port should be an integer that is less than 65535 and greater than 0"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_ENDPOINT
,
"Endpoint should be in the format of 'fqdn:port'"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_EXPRIE_STATEMENT
,
"This statement is no longer supported"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL
,
"Interval too small"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_DB_NOT_SPECIFIED
,
"Database not specified"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME
,
"Invalid identifier name"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR
,
"Corresponding super table not in this db"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_RANGE_OPTION
,
"Invalid option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_STR_OPTION
,
"Invalid option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_ENUM_OPTION
,
"Invalid option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_KEEP_NUM
,
"Invalid number of keep options"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_KEEP_ORDER
,
"Invalid keep value, should be keep0 <= keep1 <= keep2"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_KEEP_VALUE
,
"Invalid option keep"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_COMMENT_OPTION
,
"Invalid option comment"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_F_RANGE_OPTION
,
"Invalid option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_ROLLUP_OPTION
,
"Invalid option rollup: only one function is allowed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION
,
"Invalid option retentions"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST
,
"GROUP BY and WINDOW-clause can't be used together"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_OPTION_UNIT
,
"Invalid option unit: only m, h, d allowed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_KEEP_UNIT
,
"Invalid option keep unit: only m, h, d allowed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_AGG_FUNC_NESTING
,
"Aggregate functions do not support nesting"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE
,
"Only support STATE_WINDOW on integer/bool/varchar column"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_STATE_WIN_COL
,
"Not support STATE_WINDOW on tag column"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE
,
"STATE_WINDOW not support for super table query"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SESSION_GAP
,
"SESSION gap should be fixed time window, and greater than 0"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SESSION_COL
,
"Only support SESSION on primary timestamp column"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_OFFSET_NEGATIVE
,
"Interval offset cannot be negative"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_OFFSET_UNIT
,
"Cannot use 'year' as offset when interval is 'month'"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG
,
"Interval offset should be shorter than interval"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_UNIT
,
"Does not support sliding when interval is natural month/year"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG
,
"sliding value no larger than the interval value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL
,
"sliding value can not less than 1% of interval value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_ONLY_ONE_JSON_TAG
,
"Only one tag if there is a json tag"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INCORRECT_NUM_OF_COL
,
"Query block has incorrect number of result columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL
,
"Incorrect TIMESTAMP value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_DAYS_VALUE
,
"Invalid days value, should be keep2 >= keep1 >= keep0 >= days"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_OFFSET_LESS_ZERO
,
"soffset/offset can not be less than 0"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY
,
"slimit/soffset only available for PARTITION BY query"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TOPIC_QUERY
,
"Invalid topic query"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_DROP_STABLE
,
"Cannot drop super table in batch"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
,
"Start(end) time of query range required or time range too large"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_DUPLICATED_COLUMN
,
"Duplicated column names"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TAGS_LENGTH
,
"Tags length exceeds max length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
"Row length exceeds max length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
,
"Illegal number of columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_TOO_MANY_COLUMNS
,
"Too many columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_FIRST_COLUMN
,
"First column must be timestamp"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
,
"Invalid binary/nchar column length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TAGS_NUM
,
"Invalid number of tag columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_INTERNAL_PK
,
"Invalid _c0 or _rowts expression"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TIMELINE_FUNC
,
"Invalid timeline function"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_PASSWD
,
"Invalid password"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_ALTER_TABLE
,
"Invalid alter table statement"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY
,
"Primary timestamp column cannot be dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_MODIFY_COL
,
"Only binary/nchar column length could be modified"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TBNAME
,
"Invalid tbname pseudo column"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_FUNCTION_NAME
,
"Invalid function name"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_COMMENT_TOO_LONG
,
"Comment too long"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_NOT_ALLOWED_FUNC
,
"Some functions are allowed only in the SELECT list of a query. "
"And, cannot be mixed with other non scalar functions or columns."
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY
,
"Window query not supported, since the result of subquery not include valid timestamp column"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_DROP_COL
,
"No columns can be dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_COL_JSON
,
"Only tag can be json type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_VALUE_TOO_LONG
,
"Value too long for column/tag"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_DELETE_WHERE
,
"The DELETE statement must have a definite time window range"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG
,
"The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes"
)
//planner
TAOS_DEFINE_ERROR
(
TSDB_CODE_PLAN_INTERNAL_ERROR
,
"Planner internal error"
)
...
...
tests/pytest/test.py
浏览文件 @
8477d22d
...
...
@@ -131,7 +131,7 @@ if __name__ == "__main__":
is_test_framework
=
0
key_word
=
'tdCases.addWindows'
try
:
if
key_word
in
open
(
fileName
).
read
():
if
key_word
in
open
(
fileName
,
encoding
=
'UTF-8'
).
read
():
is_test_framework
=
1
except
:
pass
...
...
tests/pytest/util/dnodes.py
浏览文件 @
8477d22d
...
...
@@ -416,7 +416,7 @@ class TDDnode:
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
if
not
platform
.
system
().
lower
()
==
'windows'
:
for
port
in
range
(
6030
,
6041
):
fuserCmd
=
"fuser -k -n tcp %d"
%
port
fuserCmd
=
"fuser -k -n tcp %d
> /dev/null
"
%
port
os
.
system
(
fuserCmd
)
if
self
.
valgrind
:
time
.
sleep
(
2
)
...
...
tests/system-test/2-query/between.py
浏览文件 @
8477d22d
...
...
@@ -143,7 +143,7 @@ class TDTestCase:
tdSql
.
checkRows
(
9
)
tdSql
.
query
(
"select * from t1 where c5 between 136 and 127"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select * from t1 where c5 between '~' and '
ˆ
'"
)
tdSql
.
query
(
"select * from t1 where c5 between '~' and '
^
'"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select * from t1 where c5 not between 1 and 2"
)
# tdSql.checkRows(0)
...
...
tests/system-test/2-query/cast.py
浏览文件 @
8477d22d
...
...
@@ -565,7 +565,7 @@ class TDTestCase:
if
data_ct4_c10
[
i
]
is
None
:
tdSql
.
checkData
(
i
,
0
,
None
)
else
:
time2str
=
str
(
int
(
datetime
.
datetime
.
timestamp
(
data_ct4_c10
[
i
]
)
*
1000
))
time2str
=
str
(
int
(
(
data_ct4_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
(
)
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdSql
.
query
(
"select cast(c10 as nchar(32)) as b from t1"
)
for
i
in
range
(
len
(
data_t1_c10
)):
...
...
@@ -574,7 +574,7 @@ class TDTestCase:
elif
i
==
10
:
continue
else
:
time2str
=
str
(
int
(
datetime
.
datetime
.
timestamp
(
data_t1_c10
[
i
]
)
*
1000
))
time2str
=
str
(
int
(
(
data_t1_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
(
)
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdLog
.
printNoPrefix
(
"==========step38: cast timestamp to binary, expect no changes "
)
...
...
@@ -583,7 +583,7 @@ class TDTestCase:
if
data_ct4_c10
[
i
]
is
None
:
tdSql
.
checkData
(
i
,
0
,
None
)
else
:
time2str
=
str
(
int
(
datetime
.
datetime
.
timestamp
(
data_ct4_c10
[
i
]
)
*
1000
))
time2str
=
str
(
int
(
(
data_ct4_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
(
)
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdSql
.
query
(
"select cast(c10 as binary(32)) as b from t1"
)
for
i
in
range
(
len
(
data_t1_c10
)):
...
...
@@ -592,7 +592,7 @@ class TDTestCase:
elif
i
==
10
:
continue
else
:
time2str
=
str
(
int
(
datetime
.
datetime
.
timestamp
(
data_t1_c10
[
i
]
)
*
1000
))
time2str
=
str
(
int
(
(
data_t1_c10
[
i
]
-
datetime
.
datetime
.
fromtimestamp
(
0
)).
total_seconds
(
)
*
1000
))
tdSql
.
checkData
(
i
,
0
,
time2str
)
tdLog
.
printNoPrefix
(
"==========step39: cast constant operation to bigint, expect change to int "
)
...
...
tests/system-test/test-all.bat
浏览文件 @
8477d22d
...
...
@@ -13,6 +13,9 @@ echo Linux Taosd Test
for
/F
"usebackq tokens=*"
%%i
in
(
fulltest
.bat
)
do
(
for
/f
"tokens=1* delims= "
%%a
in
(
"
%%i
"
)
do
if
not
"
%%a
"
==
"@REM"
(
echo
Processing
%%i
call
:GetTimeSeconds
%time%
set
time1
=
!_timeTemp!
echo
Start
at
%time%
set
/a
a
+=
1
call
%%i
ARG1
-m
%
1
>
result_
!a!
.txt
2
>
error_
!a!
.txt
if
errorlevel
1
(
call
:colorEcho
0
c
"failed"
&
echo
.
&&
echo
result
:
&&
cat
result_
!a!
.txt
&&
echo
error
:
&&
cat
error_
!a!
.txt
&&
exit
8
)
else
(
call
:colorEcho
0
a
"Success"
&
echo
.
)
...
...
@@ -21,7 +24,33 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
exit
:colorEcho
call
:GetTimeSeconds
%time%
set
time2
=
%_timeTemp%
set
/a
interTime
=
%time2
%
-
%time1
%
echo
End
at
%time%
,
cast
%interTime%
s
echo
off
<
nul
set
/p
".=
%DEL%
"
>
"
%
~2"
findstr
/v /a
:
%
1
/R
"
^$
"
"
%
~2"
nul
del
"
%
~2"
>
nul
2
>&
1
i
goto
:eof
:GetTimeSeconds
set
tt
=
%
1
set
tt
=
%tt
:.
=
%
set
tt
=
%tt
::
=
%
set
index
=
1
for
%%a
in
(
%tt%
)
do
(
if
!index!
EQU
1
(
set
hh
=
%%a
)
^
else
if
!index!
EQU
2
(
set
mm
=
%%a
)
^
else
if
!index!
EQU
3
(
set
ss
=
%%a
)
set
/a
index
=
index
+
1
)
set
/a
_timeTemp
=(
%hh%
*
60
+
%mm%
)*
60
+
%ss%
goto
:eof
\ No newline at end of file
tests/system-test/test.py
浏览文件 @
8477d22d
...
...
@@ -168,7 +168,7 @@ if __name__ == "__main__":
key_word
=
'tdCases.addWindows'
is_test_framework
=
0
try
:
if
key_word
in
open
(
fileName
).
read
():
if
key_word
in
open
(
fileName
,
encoding
=
'UTF-8'
).
read
():
is_test_framework
=
1
except
:
pass
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录