Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2e1d69ce
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2e1d69ce
编写于
7月 12, 2022
作者:
J
jiacy-jcy
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into 3.0test/jcy
上级
bd4aa82d
55b94558
变更
38
展开全部
隐藏空白更改
内联
并排
Showing
38 changed file
with
2225 addition
and
1644 deletion
+2225
-1644
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+1
-0
include/util/tlockfree.h
include/util/tlockfree.h
+1
-2
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+4
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+15
-5
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-1
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+3
-1
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+6
-0
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+1
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+6
-0
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+16
-16
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-17
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+0
-98
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+107
-99
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+0
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+3
-2
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-0
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+4
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+23
-5
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+1
-1
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+1082
-1116
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+16
-16
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+16
-16
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+2
-3
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+17
-52
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+10
-2
source/util/src/tlockfree.c
source/util/src/tlockfree.c
+11
-39
tests/script/tsim/sync/electTest.sim
tests/script/tsim/sync/electTest.sim
+193
-0
tests/system-test/2-query/count_partition.py
tests/system-test/2-query/count_partition.py
+176
-0
tests/system-test/2-query/max_partition.py
tests/system-test/2-query/max_partition.py
+189
-0
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+49
-144
tests/system-test/7-tmq/tmqDnodeRestart.py
tests/system-test/7-tmq/tmqDnodeRestart.py
+253
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+10
-4
tools/taosws-rs
tools/taosws-rs
+1
-1
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
2e1d69ce
...
...
@@ -52,6 +52,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption);
* @param pMnode The mnode object to close.
*/
void
mndClose
(
SMnode
*
pMnode
);
void
mndPreClose
(
SMnode
*
pMnode
);
/**
* @brief Start mnode
...
...
include/util/tlockfree.h
浏览文件 @
2e1d69ce
...
...
@@ -69,10 +69,9 @@ typedef void (*_ref_fn_t)(const void *pObj);
#define T_REF_VAL_GET(x) (x)->_ref.val
// single writer multiple reader lock
typedef
volatile
int
64
_t
SRWLatch
;
typedef
volatile
int
32
_t
SRWLatch
;
void
taosInitRWLatch
(
SRWLatch
*
pLatch
);
void
taosInitReentrantRWLatch
(
SRWLatch
*
pLatch
);
void
taosWLockLatch
(
SRWLatch
*
pLatch
);
void
taosWUnLockLatch
(
SRWLatch
*
pLatch
);
void
taosRLockLatch
(
SRWLatch
*
pLatch
);
...
...
source/client/inc/clientInt.h
浏览文件 @
2e1d69ce
...
...
@@ -169,6 +169,7 @@ typedef struct SReqResultInfo {
uint32_t
numOfRows
;
uint64_t
totalRows
;
uint32_t
current
;
bool
localResultFetched
;
bool
completed
;
int32_t
precision
;
bool
convertUcs4
;
...
...
source/client/src/clientImpl.c
浏览文件 @
2e1d69ce
...
...
@@ -1905,6 +1905,10 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
tbLen
=
len1
;
}
if
(
dbLen
<=
0
||
tbLen
<=
0
)
{
return
-
1
;
}
if
(
tNameSetDbName
(
&
name
,
acctId
,
dbName
,
dbLen
))
{
return
-
1
;
}
...
...
source/client/src/clientMain.c
浏览文件 @
2e1d69ce
...
...
@@ -852,23 +852,33 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
// all data has returned to App already, no need to try again
if
(
(
pResultInfo
->
pData
==
NULL
||
pResultInfo
->
current
>=
pResultInfo
->
numOfRows
)
&&
pResultInfo
->
completed
)
{
if
(
pResultInfo
->
completed
&&
(
pRequest
->
body
.
queryJob
!=
0
)
)
{
pResultInfo
->
numOfRows
=
0
;
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
return
;
}
// it is a local executed query, no need to do async fetch
if
(
pResultInfo
->
current
<
pResultInfo
->
numOfRows
&&
pRequest
->
body
.
queryJob
==
0
)
{
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
if
(
pRequest
->
body
.
queryJob
==
0
)
{
ASSERT
(
pResultInfo
->
completed
&&
pResultInfo
->
numOfRows
>=
0
);
if
(
pResultInfo
->
localResultFetched
)
{
pResultInfo
->
numOfRows
=
0
;
pResultInfo
->
current
=
0
;
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
}
else
{
pResultInfo
->
localResultFetched
=
true
;
pRequest
->
body
.
fetchFp
(
param
,
pRequest
,
pResultInfo
->
numOfRows
);
}
return
;
}
SSchedulerReq
req
=
{
.
syncReq
=
false
,
.
fetchFp
=
fetchCallback
,
.
cbParam
=
pRequest
,
};
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
&
req
);
}
...
...
@@ -880,10 +890,10 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
// set the current block is all consumed
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
pResultInfo
->
convertUcs4
=
false
;
taos_fetch_rows_a
(
res
,
fp
,
param
);
// it is a local executed query, no need to do async fetch
taos_fetch_rows_a
(
pRequest
,
fp
,
param
);
}
const
void
*
taos_get_raw_block
(
TAOS_RES
*
res
)
{
...
...
source/common/src/tglobal.c
浏览文件 @
2e1d69ce
...
...
@@ -114,7 +114,7 @@ int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result
int32_t
tsMaxNumOfDistinctResults
=
1000
*
10000
;
// 1
us
for interval time range, changed accordingly
// 1
database precision unit
for interval time range, changed accordingly
int32_t
tsMinIntervalTime
=
1
;
// 20sec, the maximum value of stream computing delay, changed accordingly
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
2e1d69ce
...
...
@@ -150,6 +150,7 @@ static void mmStop(SMnodeMgmt *pMgmt) {
dDebug
(
"mnode-mgmt start to stop"
);
taosThreadRwlockWrlock
(
&
pMgmt
->
lock
);
pMgmt
->
stopped
=
1
;
mndPreClose
(
pMgmt
->
pMnode
);
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
mndStop
(
pMgmt
->
pMnode
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
2e1d69ce
...
...
@@ -75,11 +75,13 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void
vmCloseVnode
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodePreClose
(
pVnode
->
pImpl
);
taosThreadRwlockWrlock
(
&
pMgmt
->
lock
);
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
while
(
pVnode
->
refCount
>
0
)
taosMsleep
(
10
);
dTrace
(
"vgId:%d, wait for vnode queue is empty"
,
pVnode
->
vgId
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
2e1d69ce
...
...
@@ -366,6 +366,12 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return
pMnode
;
}
void
mndPreClose
(
SMnode
*
pMnode
)
{
if
(
pMnode
!=
NULL
)
{
syncLeaderTransfer
(
pMnode
->
syncMgmt
.
sync
);
}
}
void
mndClose
(
SMnode
*
pMnode
)
{
if
(
pMnode
!=
NULL
)
{
mDebug
(
"start to close mnode"
);
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
2e1d69ce
...
...
@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) {
}
// decrease election timer
setPingTimerMS
(
pMgmt
->
sync
,
5000
);
setElectTimerMS
(
pMgmt
->
sync
,
600
);
setHeartbeatTimerMS
(
pMgmt
->
sync
,
300
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
2e1d69ce
...
...
@@ -51,6 +51,7 @@ void vnodeCleanup();
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodePreClose
(
SVnode
*
pVnode
);
void
vnodeClose
(
SVnode
*
pVnode
);
int32_t
vnodeStart
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
2e1d69ce
...
...
@@ -175,6 +175,12 @@ _err:
return
NULL
;
}
void
vnodePreClose
(
SVnode
*
pVnode
)
{
if
(
pVnode
)
{
syncLeaderTransfer
(
pVnode
->
sync
);
}
}
void
vnodeClose
(
SVnode
*
pVnode
)
{
if
(
pVnode
)
{
vnodeCommit
(
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
2e1d69ce
...
...
@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return
-
1
;
}
setPingTimerMS
(
pVnode
->
sync
,
3
000
);
setPingTimerMS
(
pVnode
->
sync
,
5
000
);
setElectTimerMS
(
pVnode
->
sync
,
500
);
setHeartbeatTimerMS
(
pVnode
->
sync
,
100
);
return
0
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
2e1d69ce
...
...
@@ -482,33 +482,33 @@ typedef struct SCtgOperation {
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_
64
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) > 0); \
} else { \
assert(atomic_load_
64
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_
64
((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} else { \
assert(atomic_load_
64
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
2e1d69ce
...
...
@@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
return
code
;
}
static
int32_t
doAggregateImpl
(
SOperatorInfo
*
pOperator
,
TSKEY
startTs
,
SqlFunctionCtx
*
pCtx
)
{
static
int32_t
doAggregateImpl
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
)
{
for
(
int32_t
k
=
0
;
k
<
pOperator
->
exprSupp
.
numOfExprs
;
++
k
)
{
if
(
functionNeedToExecute
(
&
pCtx
[
k
]))
{
// todo add a dummy funtion to avoid process check
...
...
@@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext
(
pOperator
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
groupId
,
pAggInfo
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
code
=
doAggregateImpl
(
pOperator
,
0
,
pSup
->
pCtx
);
code
=
doAggregateImpl
(
pOperator
,
pSup
->
pCtx
);
if
(
code
!=
0
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
#if 0 // test for encode/decode result info
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
2e1d69ce
...
...
@@ -2860,101 +2860,3 @@ _error:
return
NULL
;
}
static
SSDataBlock
*
doScanLastrow
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SLastrowScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pTableList
);
if
(
size
==
0
)
{
setTaskStatus
(
pTaskInfo
,
TASK_COMPLETED
);
return
NULL
;
}
// check if it is a group by tbname
if
(
size
==
taosArrayGetSize
(
pInfo
->
pTableList
))
{
blockDataCleanup
(
pInfo
->
pRes
);
tsdbRetrieveLastRow
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
);
return
(
pInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
{
// todo fetch the result for each group
}
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
static
void
destroyLastrowScanOperator
(
void
*
param
,
int32_t
numOfOutput
)
{
SLastrowScanInfo
*
pInfo
=
(
SLastrowScanInfo
*
)
param
;
blockDataDestroy
(
pInfo
->
pRes
);
tsdbLastrowReaderClose
(
pInfo
->
pLastrowReader
);
taosMemoryFreeClear
(
param
);
}
SOperatorInfo
*
createLastrowScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SArray
*
pTableList
,
SExecTaskInfo
*
pTaskInfo
)
{
SLastrowScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastrowScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pTableList
=
pTableList
;
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
pRes
=
createResDataBlock
(
pScanNode
->
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pScanNode
->
node
.
pOutputDataBlockDesc
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
*
pCols
=
taosMemoryMalloc
(
numOfCols
*
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchInfo
*
pColMatch
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
pCols
[
i
]
=
pColMatch
->
colId
;
}
pInfo
->
pSlotIds
=
taosMemoryMalloc
(
numOfCols
*
sizeof
(
pInfo
->
pSlotIds
[
0
]));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchInfo
*
pColMatch
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
for
(
int32_t
j
=
0
;
j
<
pTaskInfo
->
schemaVer
.
sw
->
nCols
;
++
j
)
{
if
(
pColMatch
->
colId
==
pTaskInfo
->
schemaVer
.
sw
->
pSchema
[
j
].
colId
&&
pColMatch
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
pInfo
->
pSlotIds
[
pColMatch
->
targetSlotId
]
=
-
1
;
break
;
}
if
(
pColMatch
->
colId
==
pTaskInfo
->
schemaVer
.
sw
->
pSchema
[
j
].
colId
)
{
pInfo
->
pSlotIds
[
pColMatch
->
targetSlotId
]
=
j
;
break
;
}
}
}
tsdbLastRowReaderOpen
(
readHandle
->
vnode
,
LASTROW_RETRIEVE_TYPE_ALL
,
pTableList
,
pCols
,
numOfCols
,
&
pInfo
->
pLastrowReader
);
taosMemoryFree
(
pCols
);
pOperator
->
name
=
"LastrowScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
initResultSizeInfo
(
pOperator
,
1024
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doScanLastrow
,
NULL
,
NULL
,
destroyLastrowScanOperator
,
NULL
,
NULL
,
NULL
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
return
NULL
;
}
source/libs/function/src/builtinsimpl.c
浏览文件 @
2e1d69ce
...
...
@@ -338,6 +338,104 @@ typedef struct SGroupKeyInfo {
} \
} while (0)
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) += (d)[i]; \
(numOfElem)++; \
} \
} while (0)
#define LIST_SUB_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) -= (d)[i]; \
(numOfElem)++; \
} \
} while (0)
#define LIST_AVG_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
\
numOfElem += 1; \
pAvgRes->count -= 1; \
sumT -= plist[i]; \
} \
} while (0)
#define LIST_STDDEV_SUB_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + start; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
numOfElem += 1; \
pStddevRes->count -= 1; \
sumT -= plist[i]; \
pStddevRes->quadraticISum -= plist[i] * plist[i]; \
} \
} while (0)
#define LEASTSQR_CAL(p, x, y, index, step) \
do { \
(p)[0][0] += (double)(x) * (x); \
(p)[0][1] += (double)(x); \
(p)[0][2] += (double)(x) * (y)[index]; \
(p)[1][2] += (y)[index]; \
(x) += step; \
} while (0)
#define STATE_COMP(_op, _lval, _param) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param))
#define GET_STATE_VAL(param) ((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d))
#define STATE_COMP_IMPL(_op, _lval, _rval) \
do { \
switch (_op) { \
case STATE_OPER_LT: \
return ((_lval) < (_rval)); \
break; \
case STATE_OPER_GT: \
return ((_lval) > (_rval)); \
break; \
case STATE_OPER_LE: \
return ((_lval) <= (_rval)); \
break; \
case STATE_OPER_GE: \
return ((_lval) >= (_rval)); \
break; \
case STATE_OPER_NE: \
return ((_lval) != (_rval)); \
break; \
case STATE_OPER_EQ: \
return ((_lval) == (_rval)); \
break; \
default: \
break; \
} \
} while (0)
#define INIT_INTP_POINT(_p, _k, _v) \
do { \
(_p).key = (_k); \
(_p).val = (_v); \
} while (0)
bool
dummyGetEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
UNUSED_PARAM
(
pEnv
))
{
return
true
;
}
bool
dummyInit
(
SqlFunctionCtx
*
UNUSED_PARAM
(
pCtx
),
SResultRowEntryInfo
*
UNUSED_PARAM
(
pResultInfo
))
{
return
true
;
}
...
...
@@ -499,30 +597,6 @@ int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return
TSDB_CODE_SUCCESS
;
}
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) += (d)[i]; \
(numOfElem)++; \
} \
} while (0)
#define LIST_SUB_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) -= (d)[i]; \
(numOfElem)++; \
} \
} while (0)
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
...
...
@@ -920,20 +994,6 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
return
TSDB_CODE_SUCCESS
;
}
#define LIST_AVG_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
\
numOfElem += 1; \
pAvgRes->count -= 1; \
sumT -= plist[i]; \
} \
} while (0)
int32_t
avgInvertFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
...
...
@@ -1884,20 +1944,6 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
return
TSDB_CODE_SUCCESS
;
}
#define LIST_STDDEV_SUB_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + start; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
numOfElem += 1; \
pStddevRes->count -= 1; \
sumT -= plist[i]; \
pStddevRes->quadraticISum -= plist[i] * plist[i]; \
} \
} while (0)
int32_t
stddevInvertFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
...
...
@@ -2046,15 +2092,6 @@ bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInf
return
true
;
}
#define LEASTSQR_CAL(p, x, y, index, step) \
do { \
(p)[0][0] += (double)(x) * (x); \
(p)[0][1] += (double)(x); \
(p)[0][2] += (double)(x) * (y)[index]; \
(p)[1][2] += (y)[index]; \
(x) += step; \
} while (0)
int32_t
leastSQRFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
...
...
@@ -2733,7 +2770,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
}
}
pInfo
->
hasResult
=
true
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo
->
numOfRes
=
1
;
break
;
}
...
...
@@ -2830,7 +2866,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
pInfo
->
hasResult
=
true
;
pResInfo
->
numOfRes
=
1
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
break
;
}
...
...
@@ -4477,36 +4512,6 @@ static int8_t getStateOpType(char* opStr) {
return
opType
;
}
#define GET_STATE_VAL(param) ((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d))
#define STATE_COMP(_op, _lval, _param) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param))
#define STATE_COMP_IMPL(_op, _lval, _rval) \
do { \
switch (_op) { \
case STATE_OPER_LT: \
return ((_lval) < (_rval)); \
break; \
case STATE_OPER_GT: \
return ((_lval) > (_rval)); \
break; \
case STATE_OPER_LE: \
return ((_lval) <= (_rval)); \
break; \
case STATE_OPER_GE: \
return ((_lval) >= (_rval)); \
break; \
case STATE_OPER_NE: \
return ((_lval) != (_rval)); \
break; \
case STATE_OPER_EQ: \
return ((_lval) == (_rval)); \
break; \
default: \
break; \
} \
} while (0)
static
bool
checkStateOp
(
int8_t
op
,
SColumnInfoData
*
pCol
,
int32_t
index
,
SVariant
param
)
{
char
*
data
=
colDataGetData
(
pCol
,
index
);
switch
(
pCol
->
info
.
type
)
{
...
...
@@ -5214,12 +5219,6 @@ static double twa_get_area(SPoint1 s, SPoint1 e) {
return
val
;
}
#define INIT_INTP_POINT(_p, _k, _v) \
do { \
(_p).key = (_k); \
(_p).val = (_v); \
} while (0)
int32_t
twaFunction
(
SqlFunctionCtx
*
pCtx
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
...
...
@@ -6009,6 +6008,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
pInfo
->
hasResult
=
true
;
pResInfo
->
numOfRes
=
1
;
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
STuplePos
*
pTuplePos
=
(
STuplePos
*
)(
pInfo
->
buf
+
bytes
+
sizeof
(
TSKEY
));
if
(
!
pInfo
->
hasResult
)
{
saveTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
else
{
copyTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
}
}
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
2e1d69ce
...
...
@@ -2481,7 +2481,6 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
int32_t
code
=
tjsonToObject
(
pJson
,
jkSubplanId
,
jsonToSubplanId
,
&
pNode
->
id
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkSubplanType
,
pNode
->
subplanType
,
code
);
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanMsgType
,
&
pNode
->
msgType
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
2e1d69ce
...
...
@@ -956,7 +956,8 @@ void nodesDestroyNode(SNode* pNode) {
}
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
{
SSubplan
*
pSubplan
=
(
SSubplan
*
)
pNode
;
nodesDestroyList
(
pSubplan
->
pChildren
);
// nodesDestroyList(pSubplan->pChildren);
nodesClearList
(
pSubplan
->
pChildren
);
nodesDestroyNode
((
SNode
*
)
pSubplan
->
pNode
);
nodesDestroyNode
((
SNode
*
)
pSubplan
->
pDataSink
);
nodesDestroyNode
((
SNode
*
)
pSubplan
->
pTagCond
);
...
...
@@ -972,7 +973,7 @@ void nodesDestroyNode(SNode* pNode) {
SNode
*
pElement
=
NULL
;
FOREACH
(
pElement
,
pPlan
->
pSubplans
)
{
if
(
first
)
{
first
=
false
;
//
first = false;
nodesDestroyNode
(
pElement
);
}
else
{
nodesClearList
(((
SNodeListNode
*
)
pElement
)
->
pNodeList
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
2e1d69ce
...
...
@@ -556,6 +556,7 @@ signed_literal(A) ::= TIMESTAMP NK_STRING(B).
signed_literal(A) ::= duration_literal(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NULL(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B); }
signed_literal(A) ::= literal_func(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NK_QUESTION(B). { A = createPlaceholderValueNode(pCxt, &B); }
%type literal_list { SNodeList* }
%destructor literal_list { nodesDestroyList($$); }
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
2e1d69ce
...
...
@@ -133,7 +133,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
assert
(
*
p
==
TS_PATH_DELIMITER
[
0
]);
int32_t
dbLen
=
p
-
pTableName
->
z
;
char
name
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
dbLen
<=
0
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
}
char
name
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
strncpy
(
name
,
pTableName
->
z
,
dbLen
);
dbLen
=
strdequote
(
name
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
2e1d69ce
...
...
@@ -2173,14 +2173,28 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
return
-
1
;
}
static
const
char
*
getPrecisionStr
(
uint8_t
precision
)
{
switch
(
precision
)
{
case
TSDB_TIME_PRECISION_MILLI
:
return
TSDB_TIME_PRECISION_MILLI_STR
;
case
TSDB_TIME_PRECISION_MICRO
:
return
TSDB_TIME_PRECISION_MICRO_STR
;
case
TSDB_TIME_PRECISION_NANO
:
return
TSDB_TIME_PRECISION_NANO_STR
;
default:
break
;
}
return
"unknown"
;
}
static
int32_t
checkIntervalWindow
(
STranslateContext
*
pCxt
,
SIntervalWindowNode
*
pInterval
)
{
uint8_t
precision
=
((
SColumnNode
*
)
pInterval
->
pCol
)
->
node
.
resType
.
precision
;
SValueNode
*
pInter
=
(
SValueNode
*
)
pInterval
->
pInterval
;
bool
valInter
=
TIME_IS_VAR_DURATION
(
pInter
->
unit
);
if
(
pInter
->
datum
.
i
<=
0
||
(
!
valInter
&&
convertTimePrecision
(
pInter
->
datum
.
i
,
precision
,
TSDB_TIME_PRECISION_MICRO
)
<
tsMinIntervalTime
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL
,
tsMinIntervalTime
);
if
(
pInter
->
datum
.
i
<=
0
||
(
!
valInter
&&
pInter
->
datum
.
i
<
tsMinIntervalTime
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL
,
tsMinIntervalTime
,
getPrecisionStr
(
precision
)
);
}
if
(
NULL
!=
pInterval
->
pOffset
)
{
...
...
@@ -2754,6 +2768,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
}
}
if
(
NULL
==
pPrimaryKeyExpr
)
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
,
"Primary timestamp column can not be null"
);
}
return
addOrderByPrimaryKeyToQuery
(
pCxt
,
pPrimaryKeyExpr
,
pInsert
->
pQuery
);
}
...
...
@@ -2998,8 +3017,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
int32_t
code
=
checkRangeOption
(
pCxt
,
"buffer"
,
pOptions
->
buffer
,
TSDB_MIN_BUFFER_PER_VNODE
,
TSDB_MAX_BUFFER_PER_VNODE
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkRangeOption
(
pCxt
,
"cacheLast"
,
pOptions
->
cacheLast
,
TSDB_MIN_DB_CACHE_LAST
,
TSDB_MAX_DB_CACHE_LAST
);
code
=
checkRangeOption
(
pCxt
,
"cacheLast"
,
pOptions
->
cacheLast
,
TSDB_MIN_DB_CACHE_LAST
,
TSDB_MAX_DB_CACHE_LAST
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkRangeOption
(
pCxt
,
"cacheLastSize"
,
pOptions
->
cacheLastSize
,
TSDB_MIN_DB_CACHE_LAST_SIZE
,
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
2e1d69ce
...
...
@@ -60,7 +60,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case
TSDB_CODE_PAR_EXPRIE_STATEMENT
:
return
"This statement is no longer supported"
;
case
TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL
:
return
"Interval cannot be less than %d
u
s"
;
return
"Interval cannot be less than %d
%
s"
;
case
TSDB_CODE_PAR_DB_NOT_SPECIFIED
:
return
"Database not specified"
;
case
TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME
:
...
...
source/libs/parser/src/sql.c
浏览文件 @
2e1d69ce
此差异已折叠。
点击以展开。
source/libs/qworker/inc/qwInt.h
浏览文件 @
2e1d69ce
...
...
@@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_
64
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) > 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) > 0); \
} else { \
assert(atomic_load_
64
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_
64
((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} else { \
assert(atomic_load_
64
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
2e1d69ce
...
...
@@ -367,33 +367,33 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_
64
(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
(_lock) > 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
(_lock) > 0); \
} else { \
assert(atomic_load_
64
(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64(_lock) &
TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32(_lock) ==
TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define SCH_UNLOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_
64
((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} else { \
assert(atomic_load_
64
((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%
" PRIx64 ", %s:%d B", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%
d, %s:%d B", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%
" PRIx64 ", %s:%d E", (_lock), atomic_load_64
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
64
((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%
d, %s:%d E", (_lock), atomic_load_32
(_lock), __FILE__, __LINE__); \
assert(atomic_load_
32
((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
2e1d69ce
...
...
@@ -58,7 +58,6 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
if
(
NULL
==
pTask
->
execNodes
||
NULL
==
pTask
->
profile
.
execTime
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
taosInitReentrantRWLatch
(
&
pTask
->
lock
);
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
...
...
@@ -260,7 +259,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SSchTask
*
parent
=
*
(
SSchTask
**
)
taosArrayGet
(
pTask
->
parents
,
i
);
int32_t
readyNum
=
atomic_add_fetch_32
(
&
parent
->
childReady
,
1
);
SCH_LOCK
_TASK
(
parent
);
SCH_LOCK
(
SCH_WRITE
,
&
parent
->
planLock
);
SDownstreamSourceNode
source
=
{
.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
.
taskId
=
pTask
->
taskId
,
...
...
@@ -270,7 +269,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.
fetchMsgType
=
SCH_FETCH_TYPE
(
pTask
),
};
qSetSubplanExecutionNode
(
parent
->
plan
,
pTask
->
plan
->
id
.
groupId
,
&
source
);
SCH_UNLOCK
_TASK
(
parent
);
SCH_UNLOCK
(
SCH_WRITE
,
&
parent
->
planLock
);
if
(
SCH_TASK_READY_FOR_LAUNCH
(
readyNum
,
parent
))
{
SCH_TASK_DLOG
(
"all %d children task done, start to launch parent task 0x%"
PRIx64
,
readyNum
,
parent
->
taskId
);
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
2e1d69ce
...
...
@@ -30,7 +30,7 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS
1
000
#define PING_TIMER_MS
5
000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
2e1d69ce
...
...
@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
peersNum
==
0
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SNodeInfo
newLeader
=
(
pSyncNode
->
peersNodeInfo
)[
0
];
int32_t
ret
=
syncNodeLeaderTransfer
(
pSyncNode
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
int32_t
ret
=
syncLeaderTransferTo
(
rid
,
newLeader
);
return
ret
;
}
...
...
@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
if
(
pSyncNode
->
replicaNum
==
1
)
{
sError
(
"only one replica, cannot drop leader"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
terrno
=
TSDB_CODE_SYN_ONE_REPLICA
;
return
-
1
;
}
SyncLeaderTransfer
*
pMsg
=
syncLeaderTransferBuild
(
pSyncNode
->
vgId
);
pMsg
->
newLeaderId
.
addr
=
syncUtilAddr2U64
(
newLeader
.
nodeFqdn
,
newLeader
.
nodePort
);
pMsg
->
newLeaderId
.
vgId
=
pSyncNode
->
vgId
;
pMsg
->
newNodeInfo
=
newLeader
;
ASSERT
(
pMsg
!=
NULL
);
SRpcMsg
rpcMsg
=
{
0
};
syncLeaderTransfer2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncLeaderTransferDestroy
(
pMsg
);
ret
=
syncNodePropose
(
pSyncNode
,
&
rpcMsg
,
false
);
int32_t
ret
=
syncNodeLeaderTransferTo
(
pSyncNode
,
newLeader
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
ret
;
}
...
...
@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
return
-
1
;
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"begin leader transfer to %s:%u"
,
newLeader
.
nodeFqdn
,
newLeader
.
nodePort
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
SyncLeaderTransfer
*
pMsg
=
syncLeaderTransferBuild
(
pSyncNode
->
vgId
);
pMsg
->
newLeaderId
.
addr
=
syncUtilAddr2U64
(
newLeader
.
nodeFqdn
,
newLeader
.
nodePort
);
pMsg
->
newLeaderId
.
vgId
=
pSyncNode
->
vgId
;
...
...
@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
return
;
}
else
{
syncNodeBecomeFollower
(
pSyncNode
,
"first start"
)
;
}
syncNodeBecomeFollower
(
pSyncNode
,
"first start"
);
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
// ASSERT(ret == 0);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"==state change become leader immediately=="
,
pSyncNode
);
}
int32_t
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
}
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
// leader transfer
int32_t
ret
;
ASSERT
(
pSyncNode
!=
NULL
);
...
...
@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode
->
pNewNodeReceiver
=
NULL
;
}
/*
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
*/
// tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
}
...
...
@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&
pSyncNode
->
pPingTimer
);
atomic_store_64
(
&
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
}
else
{
sError
(
"
sync env is stop, syncNodeStartPingTimer"
);
sError
(
"
vgId:%d, start ping timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
return
ret
;
}
...
...
@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
&
pSyncNode
->
pElectTimer
);
atomic_store_64
(
&
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
}
else
{
sError
(
"
sync env is stop, syncNodeStartElectTimer"
);
sError
(
"
vgId:%d, start elect timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
return
ret
;
}
...
...
@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
&
pSyncNode
->
pHeartbeatTimer
);
atomic_store_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
,
pSyncNode
->
heartbeatTimerLogicClockUser
);
}
else
{
sError
(
"
sync env is stop, syncNodeStartHeartbeatTimer"
);
sError
(
"
vgId:%d, start heartbeat timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
return
ret
;
}
...
...
@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) {
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
syncNodeEventLog
(
ths
,
"
begin
leader transfer"
);
syncNodeEventLog
(
ths
,
"
do
leader transfer"
);
bool
sameId
=
syncUtilSameId
(
&
(
pSyncLeaderTransfer
->
newLeaderId
),
&
(
ths
->
myRaftId
));
bool
sameNodeInfo
=
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
2e1d69ce
...
...
@@ -17,6 +17,11 @@
#include "syncElection.h"
#include "syncReplication.h"
int32_t
syncNodeTimerRoutine
(
SSyncNode
*
ths
)
{
syncNodeEventLog
(
ths
,
"timer routines ... "
);
return
0
;
}
int32_t
syncNodeOnTimeoutCb
(
SSyncNode
*
ths
,
SyncTimeout
*
pMsg
)
{
int32_t
ret
=
0
;
syncTimeoutLog2
(
"==syncNodeOnTimeoutCb=="
,
pMsg
);
...
...
@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_PING
)
{
if
(
atomic_load_64
(
&
ths
->
pingTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
++
(
ths
->
pingTimerCounter
);
// syncNodePingAll(ths);
syncNodePingPeers
(
ths
);
// syncNodePingPeers(ths);
syncNodeTimerRoutine
(
ths
);
}
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_ELECTION
)
{
...
...
@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
syncNodeReplicate
(
ths
);
}
}
else
{
s
Trace
(
"unknown timeoutType:%d"
,
pMsg
->
timeoutType
);
s
Error
(
"vgId:%d, unknown timeout-type:%d"
,
ths
->
vgId
,
pMsg
->
timeoutType
);
}
return
ret
;
...
...
source/util/src/tlockfree.c
浏览文件 @
2e1d69ce
...
...
@@ -17,10 +17,8 @@
#include "tlockfree.h"
#define TD_RWLATCH_WRITE_FLAG 0x40000000
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
void
taosInitRWLatch
(
SRWLatch
*
pLatch
)
{
*
pLatch
=
0
;
}
void
taosInitReentrantRWLatch
(
SRWLatch
*
pLatch
)
{
*
pLatch
=
TD_RWLATCH_REENTRANT_FLAG
;
}
void
taosWLockLatch
(
SRWLatch
*
pLatch
)
{
SRWLatch
oLatch
,
nLatch
;
...
...
@@ -28,14 +26,8 @@ void taosWLockLatch(SRWLatch *pLatch) {
// Set write flag
while
(
1
)
{
oLatch
=
atomic_load_
64
(
pLatch
);
oLatch
=
atomic_load_
32
(
pLatch
);
if
(
oLatch
&
TD_RWLATCH_WRITE_FLAG
)
{
if
(
oLatch
&
TD_RWLATCH_REENTRANT_FLAG
)
{
nLatch
=
(((
oLatch
>>
32
)
+
1
)
<<
32
)
|
(
oLatch
&
0xFFFFFFFF
);
if
(
atomic_val_compare_exchange_64
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
continue
;
}
nLoops
++
;
if
(
nLoops
>
1000
)
{
sched_yield
();
...
...
@@ -45,14 +37,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
}
nLatch
=
oLatch
|
TD_RWLATCH_WRITE_FLAG
;
if
(
atomic_val_compare_exchange_
64
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
if
(
atomic_val_compare_exchange_
32
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
}
// wait for all reads end
nLoops
=
0
;
while
(
1
)
{
oLatch
=
atomic_load_
64
(
pLatch
);
if
(
0
==
(
oLatch
&
0xFFFFFFF
)
)
break
;
oLatch
=
atomic_load_
32
(
pLatch
);
if
(
oLatch
==
TD_RWLATCH_WRITE_FLAG
)
break
;
nLoops
++
;
if
(
nLoops
>
1000
)
{
sched_yield
();
...
...
@@ -64,47 +56,27 @@ void taosWLockLatch(SRWLatch *pLatch) {
// no reentrant
int32_t
taosWTryLockLatch
(
SRWLatch
*
pLatch
)
{
SRWLatch
oLatch
,
nLatch
;
oLatch
=
atomic_load_
64
(
pLatch
);
if
(
oLatch
<<
2
)
{
oLatch
=
atomic_load_
32
(
pLatch
);
if
(
oLatch
)
{
return
-
1
;
}
nLatch
=
oLatch
|
TD_RWLATCH_WRITE_FLAG
;
if
(
atomic_val_compare_exchange_
64
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
{
if
(
atomic_val_compare_exchange_
32
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
{
return
0
;
}
return
-
1
;
}
void
taosWUnLockLatch
(
SRWLatch
*
pLatch
)
{
SRWLatch
oLatch
,
nLatch
,
wLatch
;
while
(
1
)
{
oLatch
=
atomic_load_64
(
pLatch
);
if
(
0
==
(
oLatch
&
TD_RWLATCH_REENTRANT_FLAG
))
{
atomic_store_64
(
pLatch
,
0
);
break
;
}
wLatch
=
((
oLatch
<<
2
)
>>
34
);
if
(
wLatch
)
{
nLatch
=
((
--
wLatch
)
<<
32
)
|
TD_RWLATCH_REENTRANT_FLAG
|
TD_RWLATCH_WRITE_FLAG
;
}
else
{
nLatch
=
TD_RWLATCH_REENTRANT_FLAG
;
}
if
(
atomic_val_compare_exchange_64
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
}
}
void
taosWUnLockLatch
(
SRWLatch
*
pLatch
)
{
atomic_store_32
(
pLatch
,
0
);
}
void
taosRLockLatch
(
SRWLatch
*
pLatch
)
{
SRWLatch
oLatch
,
nLatch
;
int32_t
nLoops
=
0
;
while
(
1
)
{
oLatch
=
atomic_load_
64
(
pLatch
);
oLatch
=
atomic_load_
32
(
pLatch
);
if
(
oLatch
&
TD_RWLATCH_WRITE_FLAG
)
{
nLoops
++
;
if
(
nLoops
>
1000
)
{
...
...
@@ -115,8 +87,8 @@ void taosRLockLatch(SRWLatch *pLatch) {
}
nLatch
=
oLatch
+
1
;
if
(
atomic_val_compare_exchange_
64
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
if
(
atomic_val_compare_exchange_
32
(
pLatch
,
oLatch
,
nLatch
)
==
oLatch
)
break
;
}
}
void
taosRUnLockLatch
(
SRWLatch
*
pLatch
)
{
atomic_fetch_sub_
64
(
pLatch
,
1
);
}
void
taosRUnLockLatch
(
SRWLatch
*
pLatch
)
{
atomic_fetch_sub_
32
(
pLatch
,
1
);
}
tests/script/tsim/sync/electTest.sim
0 → 100644
浏览文件 @
2e1d69ce
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][0] != 1 then
return -1
endi
if $data[0][4] != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnodes not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][4] != ready then
goto check_dnode_ready_1
endi
if $data[1][4] != ready then
goto check_dnode_ready_1
endi
if $data[2][4] != ready then
goto check_dnode_ready_1
endi
if $data[3][4] != ready then
goto check_dnode_ready_1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ===> write 100 records
$N = 100
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
#sql flush database db;
sleep 3000
print ===> stop dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
#########################################################
tests/system-test/2-query/count_partition.py
0 → 100644
浏览文件 @
2e1d69ce
# author : wenzhouwww
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
self
.
row_nums
=
10
self
.
tb_nums
=
10
self
.
ts
=
1537146000000
def
prepare_datas
(
self
,
stb_name
,
tb_nums
,
row_nums
):
tdSql
.
execute
(
" use db "
)
tdSql
.
execute
(
f
" create stable
{
stb_name
}
(ts timestamp , c1 int , c2 bigint , c3 float , c4 double , c5 smallint , c6 tinyint , c7 bool , c8 binary(36) , c9 nchar(36) , uc1 int unsigned,
\
uc2 bigint unsigned ,uc3 smallint unsigned , uc4 tinyint unsigned ) tags(t1 timestamp , t2 int , t3 bigint , t4 float , t5 double , t6 smallint , t7 tinyint , t8 bool , t9 binary(36)
\
, t10 nchar(36) , t11 int unsigned , t12 bigint unsigned ,t13 smallint unsigned , t14 tinyint unsigned ) "
)
for
i
in
range
(
tb_nums
):
tbname
=
f
"sub_
{
stb_name
}
_
{
i
}
"
ts
=
self
.
ts
+
i
*
10000
tdSql
.
execute
(
f
"create table
{
tbname
}
using
{
stb_name
}
tags (
{
ts
}
,
{
i
}
,
{
i
}
*10 ,
{
i
}
*1.0,
{
i
}
*1.0 , 1 , 2, 'true', 'binary_
{
i
}
' ,'nchar_
{
i
}
',
{
i
}
,
{
i
}
,10,20 )"
)
for
row
in
range
(
row_nums
):
ts
=
self
.
ts
+
row
*
1000
tdSql
.
execute
(
f
"insert into
{
tbname
}
values(
{
ts
}
,
{
row
}
,
{
row
}
,
{
row
}
,
{
row
}
, 1 , 2 , 'true' , 'binary_
{
row
}
' , 'nchar_
{
row
}
' ,
{
row
}
,
{
row
}
, 1 ,2 )"
)
for
null
in
range
(
5
):
ts
=
self
.
ts
+
row_nums
*
1000
+
null
*
1000
tdSql
.
execute
(
f
"insert into
{
tbname
}
values(
{
ts
}
, NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL )"
)
def
basic_query
(
self
):
tdSql
.
query
(
"select count(*) from stb"
)
tdSql
.
checkData
(
0
,
0
,(
self
.
row_nums
+
5
)
*
self
.
tb_nums
)
tdSql
.
query
(
"select count(c1) from stb"
)
tdSql
.
checkData
(
0
,
0
,(
self
.
row_nums
)
*
self
.
tb_nums
)
tdSql
.
query
(
" select tbname , count(*) from stb partition by tbname "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
query
(
" select count(c1) from stb group by t1 order by t1 "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
error
(
" select count(c1) from stb group by c1 order by t1 "
)
tdSql
.
error
(
" select count(t1) from stb group by c1 order by t1 "
)
tdSql
.
query
(
" select count(c1) from stb group by tbname order by tbname "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
# bug need fix
# tdSql.query(" select count(t1) from stb group by t2 order by t2 ")
# tdSql.checkRows(self.tb_nums)
tdSql
.
query
(
" select count(c1) from stb group by c1 order by c1 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
" select c1 , count(c1) from stb group by c1 order by c1 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(c1) from stb group by abs(c1) order by abs(c1)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select abs(c1+c3), count(c1+c3) from stb group by abs(c1+c3) order by abs(c1+c3)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(c1+c3)+max(c2) ,abs(c1) from stb group by abs(c1) order by abs(c1)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
error
(
"select count(c1+c3)+max(c2) ,abs(c1) ,abs(t1) from stb group by abs(c1) order by abs(t1)+c2"
)
tdSql
.
error
(
"select count(c1+c3)+max(c2) ,abs(c1) from stb group by abs(c1) order by abs(c1)+c2"
)
tdSql
.
query
(
"select abs(c1+c3)+abs(c2) , count(c1+c3)+count(c2) from stb group by abs(c1+c3)+abs(c2) order by abs(c1+c3)+abs(c2)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(c1) , count(t2) from stb where abs(c1+t2)=1 partition by tbname"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select count(c1) from stb where abs(c1+t2)=1 partition by tbname"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select tbname , count(c1) from stb partition by tbname order by tbname"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
tdSql
.
error
(
"select tbname , count(c1) from stb partition by t1 order by t1"
)
tdSql
.
error
(
"select tbname , count(t1) from stb partition by t1 order by t1"
)
tdSql
.
error
(
"select tbname , count(t1) from stb partition by t2 order by t2"
)
# # bug need fix
# tdSql.query("select t2 , count(t1) from stb partition by t2 order by t2")
# tdSql.checkRows(self.tb_nums)
tdSql
.
query
(
"select tbname , count(c1) from stb partition by tbname order by tbname"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
tdSql
.
error
(
"select tbname , count(c1) from stb partition by t2 order by t2"
)
tdSql
.
query
(
"select c2, count(c1) from stb partition by c2 order by c2 desc"
)
tdSql
.
checkRows
(
self
.
tb_nums
+
1
)
tdSql
.
checkData
(
0
,
1
,
self
.
tb_nums
)
tdSql
.
error
(
"select tbname , count(c1) from stb partition by c1 order by c2"
)
tdSql
.
query
(
"select tbname , abs(t2) from stb partition by c2 order by t2"
)
tdSql
.
checkRows
(
self
.
tb_nums
*
(
self
.
row_nums
+
5
))
tdSql
.
query
(
"select count(c1) , count(t2) from stb partition by c2 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
tdSql
.
query
(
"select count(c1) , count(t2) ,c2 from stb partition by c2 order by c2"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(c1) , count(t1) ,max(c2) ,tbname from stb partition by tbname order by tbname"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkCols
(
4
)
tdSql
.
query
(
"select count(c1) , count(t2) ,t1 from stb partition by t1 order by t1"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
0
,
self
.
row_nums
)
# bug need fix
# tdSql.query("select count(c1) , count(t1) ,abs(c1) from stb partition by abs(c1) order by abs(c1)")
# tdSql.checkRows(self.row_nums+1)
tdSql
.
query
(
"select count(ceil(c2)) , count(floor(t2)) ,count(floor(c2)) from stb partition by abs(c2) order by abs(c2)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(ceil(c1-2)) , count(floor(t2+1)) ,max(c2-c1) from stb partition by abs(floor(c1)) order by abs(floor(c1))"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
# interval
tdSql
.
query
(
"select count(c1) from stb interval(2s) sliding(1s)"
)
# bug need fix
tdSql
.
query
(
'select max(c1) from stb where ts>="2022-07-06 16:00:00.000 " and ts < "2022-07-06 17:00:00.000 " interval(50s) sliding(30s) fill(NULL)'
)
tdSql
.
query
(
" select tbname , count(c1) from stb partition by tbname interval(10s) slimit 5 soffset 1 "
)
tdSql
.
query
(
"select tbname , count(c1) from stb partition by tbname interval(10s)"
)
tdSql
.
query
(
"select tbname , count(c1) from sub_stb_1 partition by tbname interval(10s)"
)
tdSql
.
checkData
(
0
,
0
,
'sub_stb_1'
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
# tdSql.query(" select tbname , count(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ")
# tdSql.checkRows(5)
# tdSql.query(" select tbname , count(c1) from stb partition by tbname order by tbname slimit 5 soffset 1 ")
# tdSql.checkRows(5)
tdSql
.
query
(
" select tbname , count(c1) from sub_stb_1 partition by tbname interval(10s) sliding(5s) "
)
tdSql
.
query
(
f
'select max(c1) from stb where ts>=
{
self
.
ts
}
and ts <
{
self
.
ts
}
+10000 partition by tbname interval(50s) sliding(30s)'
)
tdSql
.
query
(
f
'select max(c1) from stb where ts>=
{
self
.
ts
}
and ts <
{
self
.
ts
}
+10000 interval(50s) sliding(30s)'
)
tdSql
.
query
(
f
'select tbname , count(c1) from stb where ts>=
{
self
.
ts
}
and ts <
{
self
.
ts
}
+10000 partition by tbname interval(50s) sliding(30s)'
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepare_datas
(
"stb"
,
self
.
tb_nums
,
self
.
row_nums
)
self
.
basic_query
()
# # coverage case for taosd crash about bug fix
tdSql
.
query
(
" select sum(c1) from stb where t2+10 >1 "
)
tdSql
.
query
(
" select count(c1),count(t1) from stb where -t2<1 "
)
tdSql
.
query
(
" select tbname ,max(ceil(c1)) from stb group by tbname "
)
tdSql
.
query
(
" select avg(abs(c1)) , tbname from stb group by tbname "
)
tdSql
.
query
(
" select t1,c1 from stb where abs(t2+c1)=1 "
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
\ No newline at end of file
tests/system-test/2-query/max_partition.py
0 → 100644
浏览文件 @
2e1d69ce
# author : wenzhouwww
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
self
.
row_nums
=
10
self
.
tb_nums
=
10
self
.
ts
=
1537146000000
def
prepare_datas
(
self
,
stb_name
,
tb_nums
,
row_nums
):
tdSql
.
execute
(
" use db "
)
tdSql
.
execute
(
f
" create stable
{
stb_name
}
(ts timestamp , c1 int , c2 bigint , c3 float , c4 double , c5 smallint , c6 tinyint , c7 bool , c8 binary(36) , c9 nchar(36) , uc1 int unsigned,
\
uc2 bigint unsigned ,uc3 smallint unsigned , uc4 tinyint unsigned ) tags(t1 timestamp , t2 int , t3 bigint , t4 float , t5 double , t6 smallint , t7 tinyint , t8 bool , t9 binary(36)
\
, t10 nchar(36) , t11 int unsigned , t12 bigint unsigned ,t13 smallint unsigned , t14 tinyint unsigned ) "
)
for
i
in
range
(
tb_nums
):
tbname
=
f
"sub_
{
stb_name
}
_
{
i
}
"
ts
=
self
.
ts
+
i
*
10000
tdSql
.
execute
(
f
"create table
{
tbname
}
using
{
stb_name
}
tags (
{
ts
}
,
{
i
}
,
{
i
}
*10 ,
{
i
}
*1.0,
{
i
}
*1.0 , 1 , 2, 'true', 'binary_
{
i
}
' ,'nchar_
{
i
}
',
{
i
}
,
{
i
}
,10,20 )"
)
for
row
in
range
(
row_nums
):
ts
=
self
.
ts
+
row
*
1000
tdSql
.
execute
(
f
"insert into
{
tbname
}
values(
{
ts
}
,
{
row
}
,
{
row
}
,
{
row
}
,
{
row
}
, 1 , 2 , 'true' , 'binary_
{
row
}
' , 'nchar_
{
row
}
' ,
{
row
}
,
{
row
}
, 1 ,2 )"
)
for
null
in
range
(
5
):
ts
=
self
.
ts
+
row_nums
*
1000
+
null
*
1000
tdSql
.
execute
(
f
"insert into
{
tbname
}
values(
{
ts
}
, NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL )"
)
def
basic_query
(
self
):
tdSql
.
query
(
"select count(*) from stb"
)
tdSql
.
checkData
(
0
,
0
,(
self
.
row_nums
+
5
)
*
self
.
tb_nums
)
tdSql
.
query
(
"select max(c1) from stb"
)
tdSql
.
checkData
(
0
,
0
,(
self
.
row_nums
-
1
))
tdSql
.
query
(
" select tbname , max(c1) from stb partition by tbname "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
query
(
" select max(c1) from stb group by t1 order by t1 "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
query
(
" select max(c1) from stb group by c1 order by t1 "
)
tdSql
.
query
(
" select max(t2) from stb group by c1 order by t1 "
)
tdSql
.
query
(
" select max(c1) from stb group by tbname order by tbname "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
# bug need fix
# tdSql.query(" select max(t1) from stb group by t2 order by t2 ")
# tdSql.checkRows(self.tb_nums)
tdSql
.
query
(
" select max(c1) from stb group by c1 order by c1 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
" select c1 , max(c1) from stb group by c1 order by c1 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
# support selective functions
tdSql
.
query
(
" select c1 ,c2 ,c3 , max(c1) ,c4 ,c5 ,t11 from stb group by c1 order by c1 desc "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
" select c1, tbname , max(c1) ,c4 ,c5 ,t11 from stb group by c1 order by c1 desc "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
# bug need fix
# tdSql.query(" select tbname , max(c1) from sub_stb_1 where c1 is null group by c1 order by c1 desc ")
# tdSql.checkRows(1)
# tdSql.checkData(0,0,"sub_stb_1")
tdSql
.
query
(
"select max(c1) ,c2 ,t2,tbname from stb group by abs(c1) order by abs(c1)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select abs(c1+c3), count(c1+c3) ,max(c1+t2) from stb group by abs(c1+c3) order by abs(c1+c3)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select max(c1+c3)+min(c2) ,abs(c1) from stb group by abs(c1) order by abs(c1)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
error
(
"select count(c1+c3)+max(c2) ,abs(c1) ,abs(t1) from stb group by abs(c1) order by abs(t1)+c2"
)
tdSql
.
error
(
"select count(c1+c3)+max(c2) ,abs(c1) from stb group by abs(c1) order by abs(c1)+c2"
)
tdSql
.
query
(
"select abs(c1+c3)+abs(c2) , count(c1+c3)+max(c2) from stb group by abs(c1+c3)+abs(c2) order by abs(c1+c3)+abs(c2)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
" select max(c1) , max(t2) from stb where abs(c1+t2)=1 partition by tbname "
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
" select max(c1) from stb where abs(c1+t2)=1 partition by tbname "
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
" select tbname , max(c1) from stb partition by tbname order by tbname "
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
-
1
)
tdSql
.
query
(
"select tbname , max(c2) from stb partition by t1 order by t1"
)
tdSql
.
query
(
"select tbname , max(t2) from stb partition by t1 order by t1"
)
tdSql
.
query
(
"select tbname , max(t2) from stb partition by t2 order by t2"
)
# # bug need fix
# tdSql.query("select t2 , max(t2) from stb partition by t2 order by t2")
# tdSql.checkRows(self.tb_nums)
tdSql
.
query
(
"select tbname , max(c1) from stb partition by tbname order by tbname"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
-
1
)
tdSql
.
query
(
"select tbname , max(c1) from stb partition by t2 order by t2"
)
tdSql
.
query
(
"select c2, max(c1) from stb partition by c2 order by c2 desc"
)
tdSql
.
checkRows
(
self
.
tb_nums
+
1
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
-
1
)
tdSql
.
query
(
"select tbname , max(c1) from stb partition by c1 order by c2"
)
tdSql
.
query
(
"select tbname , abs(t2) from stb partition by c2 order by t2"
)
tdSql
.
checkRows
(
self
.
tb_nums
*
(
self
.
row_nums
+
5
))
tdSql
.
query
(
"select max(c1) , count(t2) from stb partition by c2 "
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
tdSql
.
query
(
"select count(c1) , max(t2) ,c2 from stb partition by c2 order by c2"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select count(c1) , count(t1) ,max(c2) ,tbname from stb partition by tbname order by tbname"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkCols
(
4
)
tdSql
.
query
(
"select count(c1) , max(t2) ,t1 from stb partition by t1 order by t1"
)
tdSql
.
checkRows
(
self
.
tb_nums
)
tdSql
.
checkData
(
0
,
0
,
self
.
row_nums
)
# bug need fix
# tdSql.query("select count(c1) , max(t1) ,abs(c1) from stb partition by abs(c1) order by abs(c1)")
# tdSql.checkRows(self.row_nums+1)
tdSql
.
query
(
"select max(ceil(c2)) , max(floor(t2)) ,max(floor(c2)) from stb partition by abs(c2) order by abs(c2)"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
tdSql
.
query
(
"select max(ceil(c1-2)) , max(floor(t2+1)) ,max(c2-c1) from stb partition by abs(floor(c1)) order by abs(floor(c1))"
)
tdSql
.
checkRows
(
self
.
row_nums
+
1
)
# interval
tdSql
.
query
(
"select max(c1) from stb interval(2s) sliding(1s)"
)
# bug need fix
tdSql
.
query
(
'select max(c1) from stb where ts>="2022-07-06 16:00:00.000 " and ts < "2022-07-06 17:00:00.000 " interval(50s) sliding(30s) fill(NULL)'
)
tdSql
.
query
(
" select tbname , count(c1) from stb partition by tbname interval(10s) slimit 5 soffset 1 "
)
tdSql
.
query
(
"select tbname , max(c1) from stb partition by tbname interval(10s)"
)
tdSql
.
checkRows
(
self
.
row_nums
*
2
)
tdSql
.
query
(
"select tbname , count(c1) from sub_stb_1 partition by tbname interval(10s)"
)
tdSql
.
checkData
(
0
,
0
,
'sub_stb_1'
)
tdSql
.
checkData
(
0
,
1
,
self
.
row_nums
)
# bug need fix
# tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ")
# tdSql.checkRows(5)
# tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 1 ")
# tdSql.checkRows(5)
tdSql
.
query
(
" select tbname , max(c1) from sub_stb_1 partition by tbname interval(10s) sliding(5s) "
)
tdSql
.
query
(
f
'select max(c1) from stb where ts>=
{
self
.
ts
}
and ts <
{
self
.
ts
}
+1000 interval(50s) sliding(30s)'
)
tdSql
.
query
(
f
'select tbname , max(c1) from stb where ts>=
{
self
.
ts
}
and ts <
{
self
.
ts
}
+1000 interval(50s) sliding(30s)'
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepare_datas
(
"stb"
,
self
.
tb_nums
,
self
.
row_nums
)
self
.
basic_query
()
# # coverage case for taosd crash about bug fix
tdSql
.
query
(
" select sum(c1) from stb where t2+10 >1 "
)
tdSql
.
query
(
" select count(c1),count(t1) from stb where -t2<1 "
)
tdSql
.
query
(
" select tbname ,max(ceil(c1)) from stb group by tbname "
)
tdSql
.
query
(
" select avg(abs(c1)) , tbname from stb group by tbname "
)
tdSql
.
query
(
" select t1,c1 from stb where abs(t2+c1)=1 "
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
\ No newline at end of file
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
2e1d69ce
...
...
@@ -16,9 +16,9 @@ from tmqCommon import *
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
0
self
.
vgroups
=
4
self
.
ctbNum
=
100
0
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
...
...
@@ -29,7 +29,7 @@ class TDTestCase:
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
3
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
...
...
@@ -37,14 +37,14 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
5
00
,
'ctbNum'
:
10
00
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
5
00
,
'batchNum'
:
4
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
...
...
@@ -54,20 +54,21 @@ class TDTestCase:
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tdLog.info("create ctb")
# tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
# ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("insert data")
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctbx"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
# 自动建表完成数据插入,启动消费
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
...
...
@@ -90,28 +91,23 @@ class TDTestCase:
'showRow'
:
1
,
'snapshot'
:
1
}
#
paraDict['vgroups'] = self.vgroups
#
paraDict['ctbNum'] = self.ctbNum
#
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
# tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
# tdLog.info("insert data by auto create ctb")
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
...
...
@@ -120,19 +116,12 @@ class TDTestCase:
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:
5
00,\
auto.commit.interval.ms:
10
00,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# time.sleep(3)
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
...
...
@@ -172,23 +161,23 @@ class TDTestCase:
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
'snapshot'
:
0
}
#
paraDict['vgroups'] = self.vgroups
#
paraDict['ctbNum'] = self.ctbNum
#
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
#
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
#
tdLog.info("create stb")
#
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
#
tdLog.info("create ctb")
#
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
#
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
#
tdLog.info("insert data")
#
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
#
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
#
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
...
...
@@ -211,14 +200,8 @@ class TDTestCase:
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"create some new child table and insert data "
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb
y
"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
...
...
@@ -237,91 +220,13 @@ class TDTestCase:
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
# 自动建表完成数据插入,启动消费
def
tmqCase3
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 3: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
400
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"insert data by auto create ctb"
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# tdLog.info("================= restart dnode ===========================")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# time.sleep(2)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 3 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
#
self.tmqCase1()
# self.tmqCase2()
self
.
tmqCase3
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
# self.tmqCase2()
TD-17267
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/7-tmq/tmqDnodeRestart.py
0 → 100644
浏览文件 @
2e1d69ce
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
10000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
3
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
500
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
400
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:500,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# time.sleep(3)
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicFromStb1
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"create some new child table and insert data "
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
"ctb"
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tmqCom
.
getStartCommitNotifyFromTmqsim
()
tdLog
.
info
(
"================= restart dnode ==========================="
)
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
5
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
totalRowsInserted
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
totalRowsInserted
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
2e1d69ce
...
...
@@ -118,9 +118,11 @@ python3 ./test.py -f 2-query/distribute_agg_stddev.py
python3 ./test.py
-f
2-query/twa.py
python3 ./test.py
-f
2-query/irate.py
python3 ./test.py
-f
2-query/and_or_for_byte.py
python3 ./test.py
-f
2-query/count_partition.py
python3 ./test.py
-f
2-query/function_null.py
#python3 ./test.py -f 2-query/queryQnode.py
python3 ./test.py
-f
2-query/queryQnode.py
python3 ./test.py
-f
2-query/max_partition.py
python3 ./test.py
-f
6-cluster/5dnode1mnode.py
#BUG python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
...
...
@@ -174,8 +176,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
#------------querPolicy 2-----------
...
...
@@ -263,6 +265,8 @@ python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 2
python3 ./test.py
-f
2-query/twa.py
-Q
2
python3 ./test.py
-f
2-query/irate.py
-Q
2
python3 ./test.py
-f
2-query/function_null.py
-Q
2
python3 ./test.py
-f
2-query/count_partition.py
-Q
2
python3 ./test.py
-f
2-query/max_partition.py
-Q
2
#------------querPolicy 3-----------
...
...
@@ -348,3 +352,5 @@ python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 3
python3 ./test.py
-f
2-query/twa.py
-Q
3
python3 ./test.py
-f
2-query/irate.py
-Q
3
python3 ./test.py
-f
2-query/function_null.py
-Q
3
python3 ./test.py
-f
2-query/count_partition.py
-Q
3
python3 ./test.py
-f
2-query/max_partition.py
-Q
3
\ No newline at end of file
taosws-rs
@
6dccac19
比较
7a94ffab
...
6dccac19
Subproject commit
7a94ffab45f08e16f09b3f430fe75d717054adb6
Subproject commit
6dccac192a2ae7dd78718ab926201aab5419327a
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录