Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ba5a6484
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ba5a6484
编写于
12月 29, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19231 from taosdata/merge/mainto3.0_1229
merge from main to 3.0
上级
9f3a7592
2bbf2f3a
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
281 addition
and
111 deletion
+281
-111
include/common/tglobal.h
include/common/tglobal.h
+1
-1
include/util/tworker.h
include/util/tworker.h
+18
-4
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-9
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+14
-14
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+6
-7
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+9
-9
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+27
-10
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+27
-21
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+7
-2
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+0
-3
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+1
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-0
source/util/src/tworker.c
source/util/src/tworker.c
+141
-9
tests/script/tsim/dnode/balance1.sim
tests/script/tsim/dnode/balance1.sim
+2
-2
tests/script/tsim/dnode/balance2.sim
tests/script/tsim/dnode/balance2.sim
+2
-2
tests/script/tsim/dnode/balance3.sim
tests/script/tsim/dnode/balance3.sim
+4
-4
tests/script/tsim/dnode/balancex.sim
tests/script/tsim/dnode/balancex.sim
+2
-2
tests/script/tsim/dnode/vnode_clean.sim
tests/script/tsim/dnode/vnode_clean.sim
+8
-8
tests/system-test/compatibilityAllTest.sh
tests/system-test/compatibilityAllTest.sh
+2
-2
未找到文件。
include/common/tglobal.h
浏览文件 @
ba5a6484
...
@@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
...
@@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
extern
int32_t
tsNumOfMnodeFetchThreads
;
extern
int32_t
tsNumOfMnodeFetchThreads
;
extern
int32_t
tsNumOfMnodeReadThreads
;
extern
int32_t
tsNumOfMnodeReadThreads
;
extern
int32_t
tsNumOfVnodeQueryThreads
;
extern
int32_t
tsNumOfVnodeQueryThreads
;
extern
int32_t
tsNum
OfVnodeStreamThreads
;
extern
float
tsRatio
OfVnodeStreamThreads
;
extern
int32_t
tsNumOfVnodeFetchThreads
;
extern
int32_t
tsNumOfVnodeFetchThreads
;
extern
int32_t
tsNumOfVnodeRsmaThreads
;
extern
int32_t
tsNumOfVnodeRsmaThreads
;
extern
int32_t
tsNumOfQnodeQueryThreads
;
extern
int32_t
tsNumOfQnodeQueryThreads
;
...
...
include/util/tworker.h
浏览文件 @
ba5a6484
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#define _TD_UTIL_WORKER_H_
#define _TD_UTIL_WORKER_H_
#include "tqueue.h"
#include "tqueue.h"
#include "tarray.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -26,10 +27,10 @@ typedef struct SQWorkerPool SQWorkerPool;
...
@@ -26,10 +27,10 @@ typedef struct SQWorkerPool SQWorkerPool;
typedef
struct
SWWorkerPool
SWWorkerPool
;
typedef
struct
SWWorkerPool
SWWorkerPool
;
typedef
struct
SQWorker
{
typedef
struct
SQWorker
{
int32_t
id
;
// worker id
int32_t
id
;
// worker id
int64_t
pid
;
// thread pid
int64_t
pid
;
// thread pid
TdThread
thread
;
// thread id
TdThread
thread
;
// thread id
SQWorkerPool
*
pool
;
void
*
pool
;
}
SQWorker
;
}
SQWorker
;
typedef
struct
SQWorkerPool
{
typedef
struct
SQWorkerPool
{
...
@@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
...
@@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
}
SQWorkerPool
;
}
SQWorkerPool
;
typedef
struct
SAutoQWorkerPool
{
float
ratio
;
STaosQset
*
qset
;
const
char
*
name
;
SArray
*
workers
;
TdThreadMutex
mutex
;
}
SAutoQWorkerPool
;
typedef
struct
SWWorker
{
typedef
struct
SWWorker
{
int32_t
id
;
// worker id
int32_t
id
;
// worker id
int64_t
pid
;
// thread pid
int64_t
pid
;
// thread pid
...
@@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
...
@@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
STaosQueue
*
tQWorkerAllocQueue
(
SQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
STaosQueue
*
tQWorkerAllocQueue
(
SQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
);
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
);
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
);
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
);
STaosQueue
*
tAutoQWorkerAllocQueue
(
SAutoQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
);
void
tAutoQWorkerFreeQueue
(
SAutoQWorkerPool
*
pool
,
STaosQueue
*
queue
);
int32_t
tWWorkerInit
(
SWWorkerPool
*
pool
);
int32_t
tWWorkerInit
(
SWWorkerPool
*
pool
);
void
tWWorkerCleanup
(
SWWorkerPool
*
pool
);
void
tWWorkerCleanup
(
SWWorkerPool
*
pool
);
STaosQueue
*
tWWorkerAllocQueue
(
SWWorkerPool
*
pool
,
void
*
ahandle
,
FItems
fp
);
STaosQueue
*
tWWorkerAllocQueue
(
SWWorkerPool
*
pool
,
void
*
ahandle
,
FItems
fp
);
...
...
source/common/src/tglobal.c
浏览文件 @
ba5a6484
...
@@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
...
@@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t
tsNumOfMnodeFetchThreads
=
1
;
int32_t
tsNumOfMnodeFetchThreads
=
1
;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeStreamThreads
=
2
;
float
tsRatioOfVnodeStreamThreads
=
1
.
0
;
int32_t
tsNumOfVnodeFetchThreads
=
4
;
int32_t
tsNumOfVnodeFetchThreads
=
4
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
int32_t
tsNumOfQnodeQueryThreads
=
4
;
int32_t
tsNumOfQnodeQueryThreads
=
4
;
...
@@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
4
);
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeStreamThreads
=
tsNumOfCores
/
4
;
if
(
cfgAddFloat
(
pCfg
,
"ratioOfVnodeStreamThreads"
,
tsRatioOfVnodeStreamThreads
,
0
.
01
,
100
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
...
@@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
...
@@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
pItem
=
cfgGetItem
(
tsCfg
,
"
num
OfVnodeStreamThreads"
);
pItem
=
cfgGetItem
(
tsCfg
,
"
ratio
OfVnodeStreamThreads"
);
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
tsNumOfVnodeStreamThreads
=
numOfCores
/
4
;
pItem
->
fval
=
tsRatioOfVnodeStreamThreads
;
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
pItem
->
i32
=
tsNumOfVnodeStreamThreads
;
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
...
@@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
...
@@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads
=
cfgGetItem
(
pCfg
,
"numOfCommitThreads"
)
->
i32
;
tsNumOfCommitThreads
=
cfgGetItem
(
pCfg
,
"numOfCommitThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfMnodeReadThreads
=
cfgGetItem
(
pCfg
,
"numOfMnodeReadThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
tsNumOfVnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeQueryThreads"
)
->
i32
;
ts
NumOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeStreamThreads"
)
->
i32
;
ts
RatioOfVnodeStreamThreads
=
cfgGetItem
(
pCfg
,
"ratioOfVnodeStreamThreads"
)
->
fval
;
tsNumOfVnodeFetchThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeFetchThreads"
)
->
i32
;
tsNumOfVnodeFetchThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeFetchThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
ba5a6484
...
@@ -26,20 +26,20 @@ extern "C" {
...
@@ -26,20 +26,20 @@ extern "C" {
#endif
#endif
typedef
struct
SVnodeMgmt
{
typedef
struct
SVnodeMgmt
{
SDnodeData
*
pData
;
SDnodeData
*
pData
;
SMsgCb
msgCb
;
SMsgCb
msgCb
;
const
char
*
path
;
const
char
*
path
;
const
char
*
name
;
const
char
*
name
;
SQWorkerPool
queryPool
;
SQWorkerPool
queryPool
;
S
QWorkerPool
streamPool
;
S
AutoQWorkerPool
streamPool
;
SWWorkerPool
fetchPool
;
SWWorkerPool
fetchPool
;
SSingleWorker
mgmtWorker
;
SSingleWorker
mgmtWorker
;
SHashObj
*
hash
;
SHashObj
*
hash
;
TdThreadRwlock
lock
;
TdThreadRwlock
lock
;
SVnodesStat
state
;
SVnodesStat
state
;
STfs
*
pTfs
;
STfs
*
pTfs
;
TdThread
thread
;
TdThread
thread
;
bool
stop
;
bool
stop
;
}
SVnodeMgmt
;
}
SVnodeMgmt
;
typedef
struct
{
typedef
struct
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
ba5a6484
...
@@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
(
void
)
tMultiWorkerInit
(
&
pVnode
->
pApplyW
,
&
acfg
);
(
void
)
tMultiWorkerInit
(
&
pVnode
->
pApplyW
,
&
acfg
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pStreamQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
pVnode
->
pStreamQ
=
t
Auto
QWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
pVnode
->
pFetchQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItems
)
vmProcessFetchQueue
);
pVnode
->
pFetchQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItems
)
vmProcessFetchQueue
);
if
(
pVnode
->
pWriteW
.
queue
==
NULL
||
pVnode
->
pSyncW
.
queue
==
NULL
||
pVnode
->
pSyncCtrlW
.
queue
==
NULL
||
if
(
pVnode
->
pWriteW
.
queue
==
NULL
||
pVnode
->
pSyncW
.
queue
==
NULL
||
pVnode
->
pSyncCtrlW
.
queue
==
NULL
||
...
@@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void
vmFreeQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
void
vmFreeQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
streamPool
,
pVnode
->
pStreamQ
);
t
Auto
QWorkerFreeQueue
(
&
pMgmt
->
streamPool
,
pVnode
->
pStreamQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pStreamQ
=
NULL
;
pVnode
->
pStreamQ
=
NULL
;
...
@@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
SQWorkerPool
*
pStreamPool
=
&
pMgmt
->
streamPool
;
S
Auto
QWorkerPool
*
pStreamPool
=
&
pMgmt
->
streamPool
;
pStreamPool
->
name
=
"vnode-stream"
;
pStreamPool
->
name
=
"vnode-stream"
;
pStreamPool
->
min
=
tsNumOfVnodeStreamThreads
;
pStreamPool
->
ratio
=
tsRatioOfVnodeStreamThreads
;
pStreamPool
->
max
=
tsNumOfVnodeStreamThreads
;
if
(
tAutoQWorkerInit
(
pStreamPool
)
!=
0
)
return
-
1
;
if
(
tQWorkerInit
(
pStreamPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
SWWorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
pFPool
->
name
=
"vnode-fetch"
;
pFPool
->
name
=
"vnode-fetch"
;
...
@@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void
vmStopWorker
(
SVnodeMgmt
*
pMgmt
)
{
void
vmStopWorker
(
SVnodeMgmt
*
pMgmt
)
{
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
streamPool
);
t
Auto
QWorkerCleanup
(
&
pMgmt
->
streamPool
);
tWWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tWWorkerCleanup
(
&
pMgmt
->
fetchPool
);
dDebug
(
"vnode workers are closed"
);
dDebug
(
"vnode workers are closed"
);
}
}
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
ba5a6484
...
@@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
...
@@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dGError
(
"msg:%p, type:%s pCont is NULL"
,
pRpc
,
TMSG_INFO
(
pRpc
->
msgType
));
dGError
(
"msg:%p, type:%s pCont is NULL"
,
pRpc
,
TMSG_INFO
(
pRpc
->
msgType
));
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
goto
_OVER
;
goto
_OVER
;
}
/*
else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
}
else
if
((
pRpc
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pRpc
->
code
==
TSDB_CODE_RPC_BROKEN_LINK
)
&&
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
(
!
IsReq
(
pRpc
))
&&
(
pRpc
->
pCont
==
NULL
))
{
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
dGError
(
"msg:%p, type:%s pCont is NULL, err: %s"
,
pRpc
,
TMSG_INFO
(
pRpc
->
msgType
),
tstrerror
(
pRpc
->
code
));
terrno = pRpc->code;
terrno
=
pRpc
->
code
;
goto _OVER;
goto
_OVER
;
}*/
}
if
(
pHandle
->
defaultNtype
==
NODE_END
)
{
if
(
pHandle
->
defaultNtype
==
NODE_END
)
{
dGError
(
"msg:%p, type:%s not processed since no handle"
,
pRpc
,
TMSG_INFO
(
pRpc
->
msgType
));
dGError
(
"msg:%p, type:%s not processed since no handle"
,
pRpc
,
TMSG_INFO
(
pRpc
->
msgType
));
...
@@ -248,9 +248,9 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
...
@@ -248,9 +248,9 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static
bool
rpcRfp
(
int32_t
code
,
tmsg_t
msgType
)
{
static
bool
rpcRfp
(
int32_t
code
,
tmsg_t
msgType
)
{
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_MNODE_NOT_FOUND
||
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_MNODE_NOT_FOUND
||
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
||
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
||
code
==
TSDB_CODE_SYN_NOT_LEADER
||
code
==
TSDB_CODE_SYN_
NOT_LEADER
||
code
==
TSDB_CODE_SYN_RESTORING
||
code
==
TSDB_CODE_VND_STOPPED
||
code
==
TSDB_CODE_SYN_
RESTORING
||
code
==
TSDB_CODE_VND_STOPPED
||
code
==
TSDB_CODE_APP_IS_STARTING
||
code
==
TSDB_CODE_APP_IS_ST
ARTING
||
code
==
TSDB_CODE_APP_IS_ST
OPPING
)
{
code
==
TSDB_CODE_APP_IS_STOPPING
)
{
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
||
msgType
==
TDMT_SCH_FETCH
||
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
||
msgType
==
TDMT_SCH_FETCH
||
msgType
==
TDMT_SCH_MERGE_FETCH
)
{
msgType
==
TDMT_SCH_MERGE_FETCH
)
{
return
false
;
return
false
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
ba5a6484
...
@@ -193,6 +193,7 @@ typedef struct {
...
@@ -193,6 +193,7 @@ typedef struct {
int64_t
lastAccessTime
;
int64_t
lastAccessTime
;
int32_t
accessTimes
;
int32_t
accessTimes
;
int32_t
numOfVnodes
;
int32_t
numOfVnodes
;
int32_t
numOfOtherNodes
;
int32_t
numOfSupportVnodes
;
int32_t
numOfSupportVnodes
;
float
numOfCores
;
float
numOfCores
;
int64_t
memTotal
;
int64_t
memTotal
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
ba5a6484
...
@@ -397,8 +397,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
...
@@ -397,8 +397,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool
reboot
=
(
pDnode
->
rebootTime
!=
statusReq
.
rebootTime
);
bool
reboot
=
(
pDnode
->
rebootTime
!=
statusReq
.
rebootTime
);
bool
needCheck
=
!
online
||
dnodeChanged
||
reboot
;
bool
needCheck
=
!
online
||
dnodeChanged
||
reboot
;
pDnode
->
accessTimes
++
;
pDnode
->
lastAccessTime
=
curMs
;
const
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
const
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
mGTrace
(
"dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d"
,
pDnode
->
id
,
mGTrace
(
"dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d"
,
pDnode
->
id
,
pDnode
->
accessTimes
,
needCheck
,
online
,
reboot
,
dnodeChanged
,
statusReq
.
statusSeq
);
pDnode
->
accessTimes
,
needCheck
,
online
,
reboot
,
dnodeChanged
,
statusReq
.
statusSeq
);
...
@@ -534,6 +532,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
...
@@ -534,6 +532,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pReq
->
info
.
rsp
=
pHead
;
pReq
->
info
.
rsp
=
pHead
;
}
}
pDnode
->
accessTimes
++
;
pDnode
->
lastAccessTime
=
curMs
;
code
=
0
;
code
=
0
;
_OVER:
_OVER:
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
ba5a6484
...
@@ -425,6 +425,7 @@ void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgOb
...
@@ -425,6 +425,7 @@ void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgOb
static
bool
mndResetDnodesArrayFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
static
bool
mndResetDnodesArrayFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SDnodeObj
*
pDnode
=
pObj
;
SDnodeObj
*
pDnode
=
pObj
;
pDnode
->
numOfVnodes
=
0
;
pDnode
->
numOfVnodes
=
0
;
pDnode
->
numOfOtherNodes
=
0
;
return
true
;
return
true
;
}
}
...
@@ -447,7 +448,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
...
@@ -447,7 +448,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
pDnode
->
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
,
pDnode
->
memAvail
,
pDnode
->
memUsed
);
pDnode
->
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
,
pDnode
->
memAvail
,
pDnode
->
memUsed
);
if
(
isMnode
)
{
if
(
isMnode
)
{
pDnode
->
numOf
Vn
odes
++
;
pDnode
->
numOf
OtherN
odes
++
;
}
}
if
(
online
&&
pDnode
->
numOfSupportVnodes
>
0
)
{
if
(
online
&&
pDnode
->
numOfSupportVnodes
>
0
)
{
...
@@ -468,14 +469,25 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
...
@@ -468,14 +469,25 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndResetDnodesArrayFp
,
NULL
,
NULL
,
NULL
);
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndResetDnodesArrayFp
,
NULL
,
NULL
,
NULL
);
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndBuildDnodesArrayFp
,
pArray
,
&
exceptDnodeId
,
NULL
);
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndBuildDnodesArrayFp
,
pArray
,
&
exceptDnodeId
,
NULL
);
mDebug
(
"build %d dnodes array"
,
(
int32_t
)
taosArrayGetSize
(
pArray
));
for
(
int32_t
i
=
0
;
i
<
(
int32_t
)
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mDebug
(
"dnode:%d, vnodes:%d others:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfOtherNodes
);
}
return
pArray
;
return
pArray
;
}
}
static
int32_t
mndCompareDnodeId
(
int32_t
*
dnode1Id
,
int32_t
*
dnode2Id
)
{
return
*
dnode1Id
>=
*
dnode2Id
?
1
:
0
;
}
static
int32_t
mndCompareDnodeId
(
int32_t
*
dnode1Id
,
int32_t
*
dnode2Id
)
{
return
*
dnode1Id
>=
*
dnode2Id
?
1
:
0
;
}
static
float
mndGetDnodeScore
(
SDnodeObj
*
pDnode
,
int32_t
additionDnodes
,
float
ratio
)
{
float
totalDnodes
=
pDnode
->
numOfVnodes
+
(
float
)
pDnode
->
numOfOtherNodes
*
ratio
+
additionDnodes
;
return
totalDnodes
/
pDnode
->
numOfSupportVnodes
;
}
static
int32_t
mndCompareDnodeVnodes
(
SDnodeObj
*
pDnode1
,
SDnodeObj
*
pDnode2
)
{
static
int32_t
mndCompareDnodeVnodes
(
SDnodeObj
*
pDnode1
,
SDnodeObj
*
pDnode2
)
{
float
d1Score
=
(
float
)
pDnode1
->
numOfVnodes
/
pDnode1
->
numOfSupportVnodes
;
float
d1Score
=
mndGetDnodeScore
(
pDnode1
,
0
,
0
.
9
)
;
float
d2Score
=
(
float
)
pDnode2
->
numOfVnodes
/
pDnode2
->
numOfSupportVnodes
;
float
d2Score
=
mndGetDnodeScore
(
pDnode2
,
0
,
0
.
9
)
;
return
d1Score
>=
d2Score
?
1
:
0
;
return
d1Score
>=
d2Score
?
1
:
0
;
}
}
...
@@ -494,7 +506,12 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
...
@@ -494,7 +506,12 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
int32_t
allocedVnodes
=
0
;
int32_t
allocedVnodes
=
0
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
mDebug
(
"start to sort %d dnodes"
,
(
int32_t
)
taosArrayGetSize
(
pArray
));
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
(
int32_t
)
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mDebug
(
"dnode:%d, score:%f"
,
pDnode
->
id
,
mndGetDnodeScore
(
pDnode
,
0
,
0
.
9
));
}
int32_t
size
=
taosArrayGetSize
(
pArray
);
int32_t
size
=
taosArrayGetSize
(
pArray
);
if
(
size
<
pVgroup
->
replica
)
{
if
(
size
<
pVgroup
->
replica
)
{
...
@@ -875,7 +892,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro
...
@@ -875,7 +892,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mInfo
(
"dnode:%d, equivalent vnodes:%d
"
,
pDnode
->
id
,
pDnode
->
numOfVn
odes
);
mInfo
(
"dnode:%d, equivalent vnodes:%d
others:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfOtherN
odes
);
}
}
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
...
@@ -935,7 +952,7 @@ static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *
...
@@ -935,7 +952,7 @@ static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mInfo
(
"dnode:%d, equivalent vnodes:%d
"
,
pDnode
->
id
,
pDnode
->
numOfVn
odes
);
mInfo
(
"dnode:%d, equivalent vnodes:%d
others:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfOtherN
odes
);
}
}
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
...
@@ -1970,16 +1987,16 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
...
@@ -1970,16 +1987,16 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mInfo
(
"dnode:%d, equivalent vnodes:%d support:%d, score:%f"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
mInfo
(
"dnode:%d, equivalent vnodes:%d
others:%d
support:%d, score:%f"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
(
float
)
pDnode
->
numOfVnodes
/
pDnode
->
numOfSupportVnodes
);
pDnode
->
numOfSupportVnodes
,
pDnode
->
numOfOtherNodes
,
mndGetDnodeScore
(
pDnode
,
0
,
1
)
);
}
}
SDnodeObj
*
pSrc
=
taosArrayGet
(
pArray
,
taosArrayGetSize
(
pArray
)
-
1
);
SDnodeObj
*
pSrc
=
taosArrayGet
(
pArray
,
taosArrayGetSize
(
pArray
)
-
1
);
SDnodeObj
*
pDst
=
taosArrayGet
(
pArray
,
0
);
SDnodeObj
*
pDst
=
taosArrayGet
(
pArray
,
0
);
float
srcScore
=
(
float
)(
pSrc
->
numOfVnodes
-
1
)
/
pSrc
->
numOfSupportVnodes
;
float
srcScore
=
mndGetDnodeScore
(
pSrc
,
-
1
,
1
)
;
float
dstScore
=
(
float
)(
pDst
->
numOfVnodes
+
1
)
/
pDst
->
numOfSupportVnodes
;
float
dstScore
=
mndGetDnodeScore
(
pDst
,
1
,
1
)
;
mInfo
(
"trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f"
,
pTrans
->
id
,
pSrc
->
id
,
src
Score
,
mInfo
(
"trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f"
,
pTrans
->
id
,
pSrc
->
id
,
dst
Score
,
pDst
->
id
,
dstScore
);
pDst
->
id
,
dstScore
);
if
(
srcScore
>
dstScore
-
0
.
000001
)
{
if
(
srcScore
>
dstScore
-
0
.
000001
)
{
...
...
source/libs/function/src/tudf.c
浏览文件 @
ba5a6484
...
@@ -841,36 +841,42 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
...
@@ -841,36 +841,42 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
numOfRows
=
(
input
[
i
].
numOfRows
>
numOfRows
)
?
input
[
i
].
numOfRows
:
numOfRows
;
numOfRows
=
(
input
[
i
].
numOfRows
>
numOfRows
)
?
input
[
i
].
numOfRows
:
numOfRows
;
}
}
output
->
info
.
rows
=
numOfRows
;
output
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
// create the basic block info structure
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
((
input
+
i
)
->
numOfRows
<
numOfRows
)
{
SColumnInfoData
*
pInfo
=
input
[
i
].
columnData
;
SColumnInfoData
*
pColInfoData
=
(
input
+
i
)
->
columnData
;
SColumnInfoData
d
=
{
0
};
int32_t
startRow
=
(
input
+
i
)
->
numOfRows
;
d
.
info
=
pInfo
->
info
;
int32_t
expandRows
=
numOfRows
-
startRow
;
colInfoDataEnsureCapacity
(
pColInfoData
,
numOfRows
,
false
);
blockDataAppendColInfo
(
output
,
&
d
);
}
blockDataEnsureCapacity
(
output
,
numOfRows
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pDest
=
taosArrayGet
(
output
->
pDataBlock
,
i
);
SColumnInfoData
*
pColInfoData
=
input
[
i
].
columnData
;
colDataAssign
(
pDest
,
pColInfoData
,
input
[
i
].
numOfRows
,
&
output
->
info
);
if
(
input
[
i
].
numOfRows
<
numOfRows
)
{
int32_t
startRow
=
input
[
i
].
numOfRows
;
int
expandRows
=
numOfRows
-
startRow
;
bool
isNull
=
colDataIsNull_s
(
pColInfoData
,
(
input
+
i
)
->
numOfRows
-
1
);
bool
isNull
=
colDataIsNull_s
(
pColInfoData
,
(
input
+
i
)
->
numOfRows
-
1
);
if
(
isNull
)
{
if
(
isNull
)
{
colDataAppendNNULL
(
p
ColInfoData
,
startRow
,
expandRows
);
colDataAppendNNULL
(
p
Dest
,
startRow
,
expandRows
);
}
else
{
}
else
{
char
*
src
=
colDataGetData
(
pColInfoData
,
(
input
+
i
)
->
numOfRows
-
1
);
char
*
src
=
colDataGetData
(
pColInfoData
,
(
input
+
i
)
->
numOfRows
-
1
);
int32_t
bytes
=
pColInfoData
->
info
.
bytes
;
char
*
data
=
taosMemoryMalloc
(
bytes
);
memcpy
(
data
,
src
,
bytes
);
for
(
int
j
=
0
;
j
<
expandRows
;
++
j
)
{
for
(
int
j
=
0
;
j
<
expandRows
;
++
j
)
{
colDataAppend
(
p
ColInfoData
,
startRow
+
j
,
data
,
false
);
colDataAppend
(
p
Dest
,
startRow
+
j
,
src
,
false
);
}
}
//colDataAppendNItems(pColInfoData, startRow, data, expandRows);
//colDataAppendNItems(pColInfoData, startRow, data, expandRows);
taosMemoryFree
(
data
);
}
}
}
}
}
taosArrayPush
(
output
->
pDataBlock
,
(
input
+
i
)
->
columnData
)
;
output
->
info
.
rows
=
numOfRows
;
if
(
IS_VAR_DATA_TYPE
((
input
+
i
)
->
columnData
->
info
.
type
))
{
output
->
info
.
hasVarCol
=
true
;
}
}
return
0
;
return
0
;
}
}
...
@@ -1824,8 +1830,8 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
...
@@ -1824,8 +1830,8 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
convertDataBlockToScalarParm
(
&
resultBlock
,
output
);
convertDataBlockToScalarParm
(
&
resultBlock
,
output
);
taosArrayDestroy
(
resultBlock
.
pDataBlock
);
taosArrayDestroy
(
resultBlock
.
pDataBlock
);
}
}
taosArrayDestroy
(
inputBlock
.
pData
Block
);
blockDataFreeRes
(
&
input
Block
);
return
err
;
return
err
;
}
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
ba5a6484
...
@@ -5703,7 +5703,8 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCrea
...
@@ -5703,7 +5703,8 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCrea
return
code
;
return
code
;
}
}
static
int32_t
checkStreamQuery
(
STranslateContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
static
int32_t
checkStreamQuery
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
)
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pStmt
->
pQuery
;
if
(
TSDB_DATA_TYPE_TIMESTAMP
!=
((
SExprNode
*
)
nodesListGetNode
(
pSelect
->
pProjectionList
,
0
))
->
resType
.
type
||
if
(
TSDB_DATA_TYPE_TIMESTAMP
!=
((
SExprNode
*
)
nodesListGetNode
(
pSelect
->
pProjectionList
,
0
))
->
resType
.
type
||
!
pSelect
->
isTimeLineResult
||
crossTableWithoutAggOper
(
pSelect
)
||
NULL
!=
pSelect
->
pOrderByList
||
!
pSelect
->
isTimeLineResult
||
crossTableWithoutAggOper
(
pSelect
)
||
NULL
!=
pSelect
->
pOrderByList
||
crossTableWithUdaf
(
pSelect
))
{
crossTableWithUdaf
(
pSelect
))
{
...
@@ -5713,6 +5714,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
...
@@ -5713,6 +5714,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"SUBTABLE expression must be of VARCHAR type"
);
"SUBTABLE expression must be of VARCHAR type"
);
}
}
if
(
NULL
==
pSelect
->
pWindow
&&
STREAM_TRIGGER_AT_ONCE
!=
pStmt
->
pOptions
->
triggerType
)
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"The trigger mode of non window query can only be AT_ONCE"
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -5726,7 +5731,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
...
@@ -5726,7 +5731,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
code
=
translateQuery
(
pCxt
,
pStmt
->
pQuery
);
code
=
translateQuery
(
pCxt
,
pStmt
->
pQuery
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkStreamQuery
(
pCxt
,
(
SSelectStmt
*
)
pStmt
->
pQuery
);
code
=
checkStreamQuery
(
pCxt
,
pStmt
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
getSourceDatabase
(
pStmt
->
pQuery
,
pCxt
->
pParseCxt
->
acctId
,
pReq
->
sourceDB
);
getSourceDatabase
(
pStmt
->
pQuery
,
pCxt
->
pParseCxt
->
acctId
,
pReq
->
sourceDB
);
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
ba5a6484
...
@@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosMemoryFree
(
pReqs
);
taosMemoryFree
(
pReqs
);
}
}
return
code
;
return
code
;
}
else
{
ASSERT
(
0
);
}
}
return
0
;
return
0
;
}
}
...
@@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) {
...
@@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) {
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
streamDispatchAllBlocks
(
pTask
,
pBlock
)
<
0
)
{
if
(
streamDispatchAllBlocks
(
pTask
,
pBlock
)
<
0
)
{
ASSERT
(
0
);
code
=
-
1
;
code
=
-
1
;
streamQueueProcessFail
(
pTask
->
outputQueue
);
streamQueueProcessFail
(
pTask
->
outputQueue
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
ba5a6484
...
@@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
...
@@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
ASSERT
(
left
>=
0
);
ASSERT
(
left
>=
0
);
if
(
left
==
0
)
{
if
(
left
==
0
)
{
taosArrayDestroy
(
pTask
->
checkReqIds
);
taosArrayDestroy
(
pTask
->
checkReqIds
);
pTask
->
checkReqIds
=
NULL
;
streamTaskLaunchRecover
(
pTask
,
version
);
streamTaskLaunchRecover
(
pTask
,
version
);
}
}
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
)
{
}
else
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
)
{
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
ba5a6484
...
@@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
...
@@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
}
}
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
taosArrayDestroy
(
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
);
taosArrayDestroy
(
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
);
taosArrayDestroy
(
pTask
->
checkReqIds
);
pTask
->
checkReqIds
=
NULL
;
}
}
if
(
pTask
->
pState
)
streamStateClose
(
pTask
->
pState
);
if
(
pTask
->
pState
)
streamStateClose
(
pTask
->
pState
);
...
...
source/util/src/tworker.c
浏览文件 @
ba5a6484
...
@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
...
@@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
worker
->
pool
=
pool
;
worker
->
pool
=
pool
;
}
}
u
Debug
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
u
Info
(
"worker:%s is initialized, min:%d max:%d"
,
pool
->
name
,
pool
->
min
,
pool
->
max
);
return
0
;
return
0
;
}
}
...
@@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SQWorker
*
worker
=
pool
->
workers
+
i
;
SQWorker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
taosThreadClear
(
&
worker
->
thread
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
}
}
}
...
@@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
taosCloseQset
(
pool
->
qset
);
taosCloseQset
(
pool
->
qset
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
u
Debug
(
"worker:%s is closed"
,
pool
->
name
);
u
Info
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
...
@@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
...
@@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
taosThreadAttrDestroy
(
&
thAttr
);
taosThreadAttrDestroy
(
&
thAttr
);
pool
->
num
++
;
pool
->
num
++
;
u
Debug
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
num
);
u
Info
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
num
);
}
while
(
pool
->
num
<
pool
->
min
);
}
while
(
pool
->
num
<
pool
->
min
);
}
}
...
@@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
...
@@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
}
}
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
void
tQWorkerFreeQueue
(
SQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
uDebug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
uInfo
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
}
int32_t
tAutoQWorkerInit
(
SAutoQWorkerPool
*
pool
)
{
pool
->
qset
=
taosOpenQset
();
pool
->
workers
=
taosArrayInit
(
2
,
sizeof
(
SQWorker
*
));
if
(
pool
->
workers
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
(
void
)
taosThreadMutexInit
(
&
pool
->
mutex
,
NULL
);
uInfo
(
"worker:%s is initialized as auto"
,
pool
->
name
);
return
0
;
}
void
tAutoQWorkerCleanup
(
SAutoQWorkerPool
*
pool
)
{
int32_t
size
=
taosArrayGetSize
(
pool
->
workers
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
taosQsetThreadResume
(
pool
->
qset
);
}
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SQWorker
*
worker
=
taosArrayGetP
(
pool
->
workers
,
i
);
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
taosMemoryFree
(
worker
);
}
taosArrayDestroy
(
pool
->
workers
);
taosCloseQset
(
pool
->
qset
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
uInfo
(
"worker:%s is closed"
,
pool
->
name
);
}
static
void
*
tAutoQWorkerThreadFp
(
SQWorker
*
worker
)
{
SAutoQWorkerPool
*
pool
=
worker
->
pool
;
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
int32_t
code
=
0
;
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
worker
->
pid
=
taosGetSelfPthreadId
();
uInfo
(
"worker:%s:%d is running, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
worker
->
pid
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pool
->
qset
,
(
void
**
)
&
msg
,
&
qinfo
)
==
0
)
{
uInfo
(
"worker:%s:%d qset:%p, got no message and exiting, thread:%08"
PRId64
,
pool
->
name
,
worker
->
id
,
pool
->
qset
,
worker
->
pid
);
break
;
}
if
(
qinfo
.
fp
!=
NULL
)
{
qinfo
.
workerId
=
worker
->
id
;
qinfo
.
threadNum
=
taosArrayGetSize
(
pool
->
workers
);
(
*
((
FItem
)
qinfo
.
fp
))(
&
qinfo
,
msg
);
}
taosUpdateItemSize
(
qinfo
.
queue
,
1
);
}
return
NULL
;
}
STaosQueue
*
tAutoQWorkerAllocQueue
(
SAutoQWorkerPool
*
pool
,
void
*
ahandle
,
FItem
fp
)
{
STaosQueue
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
return
NULL
;
taosThreadMutexLock
(
&
pool
->
mutex
);
taosSetQueueFp
(
queue
,
fp
,
NULL
);
taosAddIntoQset
(
pool
->
qset
,
queue
,
ahandle
);
int32_t
queueNum
=
taosGetQueueNumber
(
pool
->
qset
);
int32_t
curWorkerNum
=
taosArrayGetSize
(
pool
->
workers
);
int32_t
dstWorkerNum
=
ceil
(
queueNum
*
pool
->
ratio
);
if
(
dstWorkerNum
<
1
)
dstWorkerNum
=
1
;
// spawn a thread to process queue
while
(
curWorkerNum
<
dstWorkerNum
)
{
SQWorker
*
worker
=
taosMemoryCalloc
(
1
,
sizeof
(
SQWorker
));
if
(
worker
==
NULL
||
taosArrayPush
(
pool
->
workers
,
&
worker
)
==
NULL
)
{
uError
(
"worker:%s:%d failed to create"
,
pool
->
name
,
curWorkerNum
);
taosMemoryFree
(
worker
);
taosCloseQueue
(
queue
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
worker
->
id
=
curWorkerNum
;
worker
->
pool
=
pool
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tAutoQWorkerThreadFp
,
worker
)
!=
0
)
{
uError
(
"worker:%s:%d failed to create thread, total:%d"
,
pool
->
name
,
worker
->
id
,
curWorkerNum
);
(
void
)
taosArrayPop
(
pool
->
workers
);
taosMemoryFree
(
worker
);
taosCloseQueue
(
queue
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosThreadAttrDestroy
(
&
thAttr
);
uInfo
(
"worker:%s:%d is launched, total:%d"
,
pool
->
name
,
worker
->
id
,
(
int32_t
)
taosArrayGetSize
(
pool
->
workers
));
curWorkerNum
++
;
}
taosThreadMutexUnlock
(
&
pool
->
mutex
);
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p"
,
pool
->
name
,
queue
,
ahandle
);
return
queue
;
}
void
tAutoQWorkerFreeQueue
(
SAutoQWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
uInfo
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
taosCloseQueue
(
queue
);
}
}
...
@@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
...
@@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
worker
->
pool
=
pool
;
worker
->
pool
=
pool
;
}
}
u
Debug
(
"worker:%s is initialized, max:%d"
,
pool
->
name
,
pool
->
max
);
u
Info
(
"worker:%s is initialized, max:%d"
,
pool
->
name
,
pool
->
max
);
return
0
;
return
0
;
}
}
...
@@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
...
@@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pool
->
max
;
++
i
)
{
SWWorker
*
worker
=
pool
->
workers
+
i
;
SWWorker
*
worker
=
pool
->
workers
+
i
;
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
if
(
taosCheckPthreadValid
(
worker
->
thread
))
{
uInfo
(
"worker:%s:%d is stopping"
,
pool
->
name
,
worker
->
id
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadJoin
(
worker
->
thread
,
NULL
);
taosThreadClear
(
&
worker
->
thread
);
taosThreadClear
(
&
worker
->
thread
);
taosFreeQall
(
worker
->
qall
);
taosFreeQall
(
worker
->
qall
);
taosCloseQset
(
worker
->
qset
);
taosCloseQset
(
worker
->
qset
);
uInfo
(
"worker:%s:%d is stopped"
,
pool
->
name
,
worker
->
id
);
}
}
}
}
taosMemoryFreeClear
(
pool
->
workers
);
taosMemoryFreeClear
(
pool
->
workers
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
taosThreadMutexDestroy
(
&
pool
->
mutex
);
u
Debug
(
"worker:%s is closed"
,
pool
->
name
);
u
Info
(
"worker:%s is closed"
,
pool
->
name
);
}
}
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
...
@@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
...
@@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tWWorkerThreadFp
,
worker
)
!=
0
)
goto
_OVER
;
if
(
taosThreadCreate
(
&
worker
->
thread
,
&
thAttr
,
(
ThreadFp
)
tWWorkerThreadFp
,
worker
)
!=
0
)
goto
_OVER
;
u
Debug
(
"worker:%s:%d is launched, max:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
max
);
u
Info
(
"worker:%s:%d is launched, max:%d"
,
pool
->
name
,
worker
->
id
,
pool
->
max
);
pool
->
nextId
=
(
pool
->
nextId
+
1
)
%
pool
->
max
;
pool
->
nextId
=
(
pool
->
nextId
+
1
)
%
pool
->
max
;
taosThreadAttrDestroy
(
&
thAttr
);
taosThreadAttrDestroy
(
&
thAttr
);
...
@@ -259,13 +390,14 @@ _OVER:
...
@@ -259,13 +390,14 @@ _OVER:
}
else
{
}
else
{
while
(
worker
->
pid
<=
0
)
taosMsleep
(
10
);
while
(
worker
->
pid
<=
0
)
taosMsleep
(
10
);
queue
->
threadId
=
worker
->
pid
;
queue
->
threadId
=
worker
->
pid
;
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p thread:%08"
PRId64
,
pool
->
name
,
queue
,
ahandle
,
queue
->
threadId
);
uInfo
(
"worker:%s, queue:%p is allocated, ahandle:%p thread:%08"
PRId64
,
pool
->
name
,
queue
,
ahandle
,
queue
->
threadId
);
return
queue
;
return
queue
;
}
}
}
}
void
tWWorkerFreeQueue
(
SWWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
void
tWWorkerFreeQueue
(
SWWorkerPool
*
pool
,
STaosQueue
*
queue
)
{
u
Debug
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
u
Info
(
"worker:%s, queue:%p is freed"
,
pool
->
name
,
queue
);
taosCloseQueue
(
queue
);
taosCloseQueue
(
queue
);
}
}
...
...
tests/script/tsim/dnode/balance1.sim
浏览文件 @
ba5a6484
...
@@ -74,10 +74,10 @@ sql insert into d2.t2 values(now+5s, 21)
...
@@ -74,10 +74,10 @@ sql insert into d2.t2 values(now+5s, 21)
sql select * from information_schema.ins_dnodes
sql select * from information_schema.ins_dnodes
print dnode1 openVnodes $data(1)[2]
print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(2)[2]
print dnode2 openVnodes $data(2)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(2)[2] !=
2
then
if $data(2)[2] !=
1
then
return -1
return -1
endi
endi
...
...
tests/script/tsim/dnode/balance2.sim
浏览文件 @
ba5a6484
...
@@ -161,13 +161,13 @@ print dnode1 openVnodes $data(1)[2]
...
@@ -161,13 +161,13 @@ print dnode1 openVnodes $data(1)[2]
print dnode3 openVnodes $data(3)[2]
print dnode3 openVnodes $data(3)[2]
print dnode4 openVnodes $data(4)[2]
print dnode4 openVnodes $data(4)[2]
print dnode5 openVnodes $data(5)[2]
print dnode5 openVnodes $data(5)[2]
if $data(1)[2] !=
2
then
if $data(1)[2] !=
3
then
return -1
return -1
endi
endi
if $data(3)[2] != 3 then
if $data(3)[2] != 3 then
return -1
return -1
endi
endi
if $data(4)[2] !=
4
then
if $data(4)[2] !=
3
then
return -1
return -1
endi
endi
if $data(5)[2] != 3 then
if $data(5)[2] != 3 then
...
...
tests/script/tsim/dnode/balance3.sim
浏览文件 @
ba5a6484
...
@@ -127,10 +127,10 @@ print dnode1 openVnodes $data(1)[2]
...
@@ -127,10 +127,10 @@ print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(2)[2]
print dnode2 openVnodes $data(2)[2]
print dnode3 openVnodes $data(3)[2]
print dnode3 openVnodes $data(3)[2]
print dnode4 openVnodes $data(4)[2]
print dnode4 openVnodes $data(4)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(2)[2] !=
2
then
if $data(2)[2] !=
1
then
return -1
return -1
endi
endi
if $data(3)[2] != 2 then
if $data(3)[2] != 2 then
...
@@ -228,10 +228,10 @@ print dnode1 openVnodes $data(1)[2]
...
@@ -228,10 +228,10 @@ print dnode1 openVnodes $data(1)[2]
print dnode3 openVnodes $data(3)[2]
print dnode3 openVnodes $data(3)[2]
print dnode4 openVnodes $data(4)[2]
print dnode4 openVnodes $data(4)[2]
print dnode5 openVnodes $data(5)[2]
print dnode5 openVnodes $data(5)[2]
if $data(1)[2] !=
1
then
if $data(1)[2] !=
2
then
return -1
return -1
endi
endi
if $data(3)[2] !=
3
then
if $data(3)[2] !=
2
then
return -1
return -1
endi
endi
if $data(4)[2] != 3 then
if $data(4)[2] != 3 then
...
...
tests/script/tsim/dnode/balancex.sim
浏览文件 @
ba5a6484
...
@@ -142,10 +142,10 @@ print dnode1 openVnodes $data(1)[2]
...
@@ -142,10 +142,10 @@ print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(2)[2]
print dnode2 openVnodes $data(2)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(4)[2]
print dnode2 openVnodes $data(4)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(2)[2] !=
2
then
if $data(2)[2] !=
1
then
return -1
return -1
endi
endi
if $data(3)[2] != 2 then
if $data(3)[2] != 2 then
...
...
tests/script/tsim/dnode/vnode_clean.sim
浏览文件 @
ba5a6484
...
@@ -71,10 +71,10 @@ sql insert into d2.t2 values(now+5s, 21)
...
@@ -71,10 +71,10 @@ sql insert into d2.t2 values(now+5s, 21)
sql select * from information_schema.ins_dnodes
sql select * from information_schema.ins_dnodes
print dnode1 openVnodes $data(1)[2]
print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(2)[2]
print dnode2 openVnodes $data(2)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(2)[2] !=
2
then
if $data(2)[2] !=
1
then
return -1
return -1
endi
endi
...
@@ -181,10 +181,10 @@ sql select * from information_schema.ins_dnodes
...
@@ -181,10 +181,10 @@ sql select * from information_schema.ins_dnodes
print dnode1 openVnodes $data(1)[2]
print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(4)[2]
print dnode2 openVnodes $data(4)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(3)[2] !=
2
then
if $data(3)[2] !=
1
then
return -1
return -1
endi
endi
if $data(4)[2] != 1 then
if $data(4)[2] != 1 then
...
@@ -204,10 +204,10 @@ sql select * from information_schema.ins_dnodes
...
@@ -204,10 +204,10 @@ sql select * from information_schema.ins_dnodes
print dnode1 openVnodes $data(1)[2]
print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(4)[2]
print dnode2 openVnodes $data(4)[2]
if $data(1)[2] !=
0
then
if $data(1)[2] !=
1
then
return -1
return -1
endi
endi
if $data(3)[2] !=
2
then
if $data(3)[2] !=
1
then
return -1
return -1
endi
endi
if $data(4)[2] != 2 then
if $data(4)[2] != 2 then
...
@@ -220,13 +220,13 @@ sql select * from information_schema.ins_dnodes
...
@@ -220,13 +220,13 @@ sql select * from information_schema.ins_dnodes
print dnode1 openVnodes $data(1)[2]
print dnode1 openVnodes $data(1)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(3)[2]
print dnode2 openVnodes $data(4)[2]
print dnode2 openVnodes $data(4)[2]
if $data(1)[2] !=
1
then
if $data(1)[2] !=
2
then
return -1
return -1
endi
endi
if $data(3)[2] != null then
if $data(3)[2] != null then
return -1
return -1
endi
endi
if $data(4)[2] !=
3
then
if $data(4)[2] !=
2
then
return -1
return -1
endi
endi
...
...
tests/system-test/compatibilityAllTest.sh
浏览文件 @
ba5a6484
...
@@ -42,5 +42,5 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py
...
@@ -42,5 +42,5 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py
-f
7-tmq/tmqCheckData1.py
python3 ./test.py
-f
7-tmq/tmqCheckData1.py
python3 ./test.py
-f
7-tmq/tmqConsumerGroup.py
python3 ./test.py
-f
7-tmq/tmqConsumerGroup.py
python3 ./test.py
-f
7-tmq/tmqShow.py
python3 ./test.py
-f
7-tmq/tmqShow.py
python3 ./test.py
-f
7-tmq/tmqAlterSchema.
python3 ./test.py
-f
7-tmq/tmqAlterSchema.py
python3 ./test.py
-f
99-TDcase/TD-20582.py
python3 ./test.py
-f
99-TDcase/TD-20582.py
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录