Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e09ddf75
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e09ddf75
编写于
7月 01, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/dnode
上级
f2468013
f01e5e4b
变更
23
显示空白变更内容
内联
并排
Showing
23 changed file
with
546 addition
and
184 deletion
+546
-184
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+1
-0
include/os/osMath.h
include/os/osMath.h
+7
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+2
-2
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+2
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-1
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+3
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+4
-7
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+108
-0
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+2
-2
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+1
-1
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+4
-4
source/libs/parser/src/parInsertData.c
source/libs/parser/src/parInsertData.c
+2
-2
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+2
-2
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+1
-1
source/os/src/osMath.c
source/os/src/osMath.c
+47
-0
source/util/src/tarray.c
source/util/src/tarray.c
+2
-2
source/util/src/tdigest.c
source/util/src/tdigest.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-1
tests/system-test/0-others/udfTest.py
tests/system-test/0-others/udfTest.py
+48
-44
tests/system-test/0-others/udf_create.py
tests/system-test/0-others/udf_create.py
+48
-44
tests/system-test/0-others/udf_restart_taosd.py
tests/system-test/0-others/udf_restart_taosd.py
+50
-46
tests/system-test/1-insert/test_stmt_insert_query_ex.py
tests/system-test/1-insert/test_stmt_insert_query_ex.py
+12
-12
tests/system-test/7-tmq/tmqConsFromTsdb.py
tests/system-test/7-tmq/tmqConsFromTsdb.py
+197
-10
未找到文件。
include/libs/function/functionMgt.h
浏览文件 @
e09ddf75
...
...
@@ -34,6 +34,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ELAPSED
,
FUNCTION_TYPE_IRATE
,
FUNCTION_TYPE_LAST_ROW
,
FUNCTION_TYPE_LAST_ROWT
,
//TODO: removed
FUNCTION_TYPE_MAX
,
FUNCTION_TYPE_MIN
,
FUNCTION_TYPE_MODE
,
...
...
include/os/osMath.h
浏览文件 @
e09ddf75
...
...
@@ -60,6 +60,13 @@ extern "C" {
})
#endif
#ifndef __COMPAR_FN_T
#define __COMPAR_FN_T
typedef
int32_t
(
*
__compar_fn_t
)(
const
void
*
,
const
void
*
);
#endif
void
taosSort
(
void
*
arr
,
int64_t
sz
,
int64_t
width
,
__compar_fn_t
compar
);
#ifdef __cplusplus
}
#endif
...
...
source/common/src/tdatablock.c
浏览文件 @
e09ddf75
...
...
@@ -892,7 +892,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int64_t
p0
=
taosGetTimestampUs
();
__compar_fn_t
fn
=
getKeyComparFunc
(
pColInfoData
->
info
.
type
,
pOrder
->
order
);
qs
ort
(
pColInfoData
->
pData
,
pDataBlock
->
info
.
rows
,
pColInfoData
->
info
.
bytes
,
fn
);
taosS
ort
(
pColInfoData
->
pData
,
pDataBlock
->
info
.
rows
,
pColInfoData
->
info
.
bytes
,
fn
);
int64_t
p1
=
taosGetTimestampUs
();
uDebug
(
"blockDataSort easy cost:%"
PRId64
", rows:%d
\n
"
,
p1
-
p0
,
pDataBlock
->
info
.
rows
);
...
...
@@ -1218,7 +1218,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
dst
->
pDataBlock
,
i
);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
src
->
pDataBlock
,
i
);
if
(
pSrc
->
pData
==
NULL
)
{
if
(
pSrc
->
pData
==
NULL
&&
(
!
IS_VAR_DATA_TYPE
(
pSrc
->
info
.
type
))
)
{
continue
;
}
...
...
source/common/src/tdataformat.c
浏览文件 @
e09ddf75
...
...
@@ -931,9 +931,9 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
// sort
if
(
isJson
)
{
qs
ort
(
pArray
->
pData
,
nTag
,
sizeof
(
STagVal
),
tTagValJsonCmprFn
);
taosS
ort
(
pArray
->
pData
,
nTag
,
sizeof
(
STagVal
),
tTagValJsonCmprFn
);
}
else
{
qs
ort
(
pArray
->
pData
,
nTag
,
sizeof
(
STagVal
),
tTagValCmprFn
);
taosS
ort
(
pArray
->
pData
,
nTag
,
sizeof
(
STagVal
),
tTagValCmprFn
);
}
// get size
...
...
source/libs/executor/src/executil.c
浏览文件 @
e09ddf75
...
...
@@ -118,7 +118,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
if
(
order
==
TSDB_ORDER_ASC
||
order
==
TSDB_ORDER_DESC
)
{
__compar_fn_t
fn
=
(
order
==
TSDB_ORDER_ASC
)
?
resultrowComparAsc
:
resultrowComparDesc
;
qs
ort
(
pGroupResInfo
->
pRows
->
pData
,
taosArrayGetSize
(
pGroupResInfo
->
pRows
),
POINTER_BYTES
,
fn
);
taosS
ort
(
pGroupResInfo
->
pRows
->
pData
,
taosArrayGetSize
(
pGroupResInfo
->
pRows
),
POINTER_BYTES
,
fn
);
}
pGroupResInfo
->
index
=
0
;
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
e09ddf75
...
...
@@ -117,6 +117,9 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t
lastCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
int32_t
getFirstLastInfoSize
(
int32_t
resBytes
);
int32_t
lastRowFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastRowFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
bool
getTopBotMergeFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
bool
topBotFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
e09ddf75
...
...
@@ -1953,16 +1953,13 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
},
{
.
name
=
"last_row"
,
.
type
=
FUNCTION_TYPE_LAST_ROW
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_MULTI_RES_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
type
=
FUNCTION_TYPE_LAST_ROW
T
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_
SELECT_FUNC
|
FUNC_MGT_
MULTI_RES_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateFirstLast
,
.
getEnvFunc
=
getFirstLastFuncEnv
,
.
initFunc
=
functionSetup
,
.
processFunc
=
lastFunction
,
.
finalizeFunc
=
firstLastFinalize
,
.
pPartialFunc
=
"_last_partial"
,
.
pMergeFunc
=
"_last_merge"
,
.
combineFunc
=
lastCombine
,
.
processFunc
=
lastRowFunction
,
.
finalizeFunc
=
lastRowFinalize
,
},
{
.
name
=
"_cache_last_row"
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
e09ddf75
...
...
@@ -80,6 +80,7 @@ typedef struct STopBotRes {
typedef
struct
SFirstLastRes
{
bool
hasResult
;
bool
isNull
;
//used for last_row function only
int32_t
bytes
;
char
buf
[];
}
SFirstLastRes
;
...
...
@@ -2763,6 +2764,113 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
lastRowFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SFirstLastRes
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pInputCol
->
info
.
type
;
int32_t
bytes
=
pInputCol
->
info
.
bytes
;
pInfo
->
bytes
=
bytes
;
SColumnDataAgg
*
pColAgg
=
(
pInput
->
colDataAggIsSet
)
?
pInput
->
pColumnDataAgg
[
0
]
:
NULL
;
TSKEY
startKey
=
getRowPTs
(
pInput
->
pPTS
,
0
);
TSKEY
endKey
=
getRowPTs
(
pInput
->
pPTS
,
pInput
->
totalRows
-
1
);
int32_t
blockDataOrder
=
(
startKey
<=
endKey
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
if
(
blockDataOrder
==
TSDB_ORDER_ASC
)
{
for
(
int32_t
i
=
pInput
->
numOfRows
+
pInput
->
startRowIndex
-
1
;
i
>=
pInput
->
startRowIndex
;
--
i
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
)
<
cts
)
{
if
(
pInputCol
->
hasNull
&&
colDataIsNull
(
pInputCol
,
pInput
->
totalRows
,
i
,
pColAgg
))
{
pInfo
->
isNull
=
true
;
}
else
{
pInfo
->
isNull
=
false
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
+
sizeof
(
TSKEY
),
data
,
bytes
);
}
*
(
TSKEY
*
)(
pInfo
->
buf
)
=
cts
;
numOfElems
++
;
//handle selectivity
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
STuplePos
*
pTuplePos
=
(
STuplePos
*
)(
pInfo
->
buf
+
bytes
+
sizeof
(
TSKEY
));
if
(
!
pInfo
->
hasResult
)
{
saveTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
else
{
copyTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
}
pInfo
->
hasResult
=
true
;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo
->
numOfRes
=
1
;
}
break
;
}
}
else
{
// descending order
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
)
<
cts
)
{
if
(
pInputCol
->
hasNull
&&
colDataIsNull
(
pInputCol
,
pInput
->
totalRows
,
i
,
pColAgg
))
{
pInfo
->
isNull
=
true
;
}
else
{
pInfo
->
isNull
=
false
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
+
sizeof
(
TSKEY
),
data
,
bytes
);
}
*
(
TSKEY
*
)(
pInfo
->
buf
)
=
cts
;
numOfElems
++
;
//handle selectivity
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
STuplePos
*
pTuplePos
=
(
STuplePos
*
)(
pInfo
->
buf
+
bytes
+
sizeof
(
TSKEY
));
if
(
!
pInfo
->
hasResult
)
{
saveTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
else
{
copyTupleData
(
pCtx
,
i
,
pCtx
->
pSrcBlock
,
pTuplePos
);
}
}
pInfo
->
hasResult
=
true
;
pResInfo
->
numOfRes
=
1
;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
break
;
}
}
SET_VAL
(
pResInfo
,
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
lastRowFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SFirstLastRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
colDataAppend
(
pCol
,
pBlock
->
info
.
rows
,
pRes
->
buf
+
sizeof
(
TSKEY
),
pRes
->
isNull
);
//handle selectivity
STuplePos
*
pTuplePos
=
(
STuplePos
*
)(
pRes
->
buf
+
pRes
->
bytes
+
sizeof
(
TSKEY
));
setSelectivityValue
(
pCtx
,
pBlock
,
pTuplePos
,
pBlock
->
info
.
rows
);
return
pResInfo
->
numOfRes
;
}
bool
getDiffFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SDiffInfo
);
return
true
;
...
...
source/libs/function/src/taggfunction.c
浏览文件 @
e09ddf75
...
...
@@ -1887,10 +1887,10 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) {
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].param.i == PRIMARYKEY_TIMESTAMP_COL_ID) {
__compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
qs
ort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
taosS
ort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} else /*if (pCtx->param[1].param.i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ {
__compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
qs
ort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
taosS
ort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
}
GET_TRUE_DATA_TYPE();
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
e09ddf75
...
...
@@ -44,7 +44,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
offset
+=
(
int32_t
)(
pg
->
num
*
pMemBucket
->
bytes
);
}
qs
ort
(
buffer
->
data
,
pMemBucket
->
pSlots
[
slotIdx
].
info
.
size
,
pMemBucket
->
bytes
,
pMemBucket
->
comparFn
);
taosS
ort
(
buffer
->
data
,
pMemBucket
->
pSlots
[
slotIdx
].
info
.
size
,
pMemBucket
->
bytes
,
pMemBucket
->
comparFn
);
return
buffer
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
e09ddf75
...
...
@@ -790,11 +790,11 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
pColIdx
[
i
].
schemaColIdx
=
pColList
->
boundColumns
[
i
];
pColIdx
[
i
].
boundIdx
=
i
;
}
qs
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
schemaIdxCompar
);
taosS
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
schemaIdxCompar
);
for
(
col_id_t
i
=
0
;
i
<
pColList
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
finalIdx
=
i
;
}
qs
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
boundIdxCompar
);
taosS
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
boundIdxCompar
);
}
if
(
pColList
->
numOfCols
>
pColList
->
numOfBound
)
{
...
...
@@ -2232,11 +2232,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
pColIdx
[
i
].
schemaColIdx
=
pColList
->
boundColumns
[
i
];
pColIdx
[
i
].
boundIdx
=
i
;
}
qs
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
schemaIdxCompar
);
taosS
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
schemaIdxCompar
);
for
(
col_id_t
i
=
0
;
i
<
pColList
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
finalIdx
=
i
;
}
qs
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
boundIdxCompar
);
taosS
ort
(
pColIdx
,
pColList
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
boundIdxCompar
);
}
if
(
pColList
->
numOfCols
>
pColList
->
numOfBound
)
{
...
...
source/libs/parser/src/parInsertData.c
浏览文件 @
e09ddf75
...
...
@@ -295,7 +295,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
char
*
pBlockData
=
pBlocks
->
data
;
// todo. qsort is unstable, if timestamp is same, should get the last one
qs
ort
(
pBlockData
,
pBlocks
->
numOfRows
,
dataBuf
->
rowSize
,
rowDataCompar
);
taosS
ort
(
pBlockData
,
pBlocks
->
numOfRows
,
dataBuf
->
rowSize
,
rowDataCompar
);
int32_t
i
=
0
;
int32_t
j
=
1
;
...
...
@@ -365,7 +365,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
pBlkKeyTuple
=
pBlkKeyInfo
->
pKeyTuple
;
// todo. qsort is unstable, if timestamp is same, should get the last one
qs
ort
(
pBlkKeyTuple
,
nRows
,
sizeof
(
SBlockKeyTuple
),
rowDataComparStable
);
taosS
ort
(
pBlkKeyTuple
,
nRows
,
sizeof
(
SBlockKeyTuple
),
rowDataComparStable
);
pBlkKeyTuple
=
pBlkKeyInfo
->
pKeyTuple
;
int32_t
i
=
0
;
...
...
source/libs/scalar/src/filter.c
浏览文件 @
e09ddf75
...
...
@@ -2059,7 +2059,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t
}
if
(
colIdxi
>
1
)
{
qs
ort
(
colIdx
,
colIdxi
,
sizeof
(
uint32_t
),
getComparFunc
(
TSDB_DATA_TYPE_USMALLINT
,
0
));
taosS
ort
(
colIdx
,
colIdxi
,
sizeof
(
uint32_t
),
getComparFunc
(
TSDB_DATA_TYPE_USMALLINT
,
0
));
}
for
(
uint32_t
l
=
0
;
l
<
colIdxi
;
++
l
)
{
...
...
@@ -2294,7 +2294,7 @@ int32_t filterMergeGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t *gR
return
TSDB_CODE_SUCCESS
;
}
qs
ort
(
gRes
,
*
gResNum
,
POINTER_BYTES
,
filterCompareGroupCtx
);
taosS
ort
(
gRes
,
*
gResNum
,
POINTER_BYTES
,
filterCompareGroupCtx
);
int32_t
pEnd
=
0
,
cStart
=
0
,
cEnd
=
0
;
uint32_t
pColNum
=
0
,
cColNum
=
0
;
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
e09ddf75
...
...
@@ -287,7 +287,7 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC
int32_t
code
=
0
;
if
(
NULL
==
pParamList
)
{
if
(
ctx
->
pBlockList
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
ctx
->
pBlockList
,
0
);
SSDataBlock
*
pBlock
=
taosArrayGet
P
(
ctx
->
pBlockList
,
0
);
*
rowNum
=
pBlock
->
info
.
rows
;
}
else
{
*
rowNum
=
1
;
...
...
source/os/src/osMath.c
0 → 100644
浏览文件 @
e09ddf75
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#ifdef WINDOWS
void
swapStr
(
char
*
j
,
char
*
J
,
int
width
)
{
int
i
;
char
tmp
;
for
(
i
=
0
;
i
<
width
;
i
++
)
{
tmp
=
*
j
;
*
j
=
*
J
;
*
J
=
tmp
;
j
++
;
J
++
;
}
}
#endif
void
taosSort
(
void
*
arr
,
int64_t
sz
,
int64_t
width
,
__compar_fn_t
compar
)
{
#ifdef WINDOWS
int64_t
i
,
j
;
for
(
i
=
0
;
i
<
sz
-
1
;
i
++
)
{
for
(
j
=
0
;
j
<
sz
-
1
-
i
;
j
++
)
{
if
(
compar
((
char
*
)
arr
+
j
*
width
,
(
char
*
)
arr
+
(
j
+
1
)
*
width
)
>
0
.
00
)
{
swapStr
((
char
*
)
arr
+
j
*
width
,
(
char
*
)
arr
+
(
j
+
1
)
*
width
,
width
);
}
}
}
#else
qsort
(
arr
,
sz
,
width
,
compar
);
#endif
}
source/util/src/tarray.c
浏览文件 @
e09ddf75
...
...
@@ -373,7 +373,7 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) {
assert
(
pArray
!=
NULL
);
assert
(
compar
!=
NULL
);
qs
ort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
compar
);
taosS
ort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
compar
);
}
void
*
taosArraySearch
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int32_t
flags
)
{
...
...
@@ -390,7 +390,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
void
taosArraySortString
(
SArray
*
pArray
,
__compar_fn_t
comparFn
)
{
assert
(
pArray
!=
NULL
);
qs
ort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
);
taosS
ort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
);
}
char
*
taosArraySearchString
(
const
SArray
*
pArray
,
const
char
*
key
,
__compar_fn_t
comparFn
,
int32_t
flags
)
{
...
...
source/util/src/tdigest.c
浏览文件 @
e09ddf75
...
...
@@ -124,7 +124,7 @@ void tdigestCompress(TDigest *t) {
t
->
num_buffered_pts
=
0
;
t
->
total_weight
+=
unmerged_weight
;
qs
ort
(
unmerged_centroids
,
num_unmerged
,
sizeof
(
SCentroid
),
cmpCentroid
);
taosS
ort
(
unmerged_centroids
,
num_unmerged
,
sizeof
(
SCentroid
),
cmpCentroid
);
memset
(
&
args
,
0
,
sizeof
(
SMergeArgs
));
args
.
centroids
=
(
SCentroid
*
)
taosMemoryMalloc
((
size_t
)(
sizeof
(
SCentroid
)
*
t
->
size
));
memset
(
args
.
centroids
,
0
,
(
size_t
)(
sizeof
(
SCentroid
)
*
t
->
size
));
...
...
source/util/src/terror.c
浏览文件 @
e09ddf75
...
...
@@ -614,7 +614,7 @@ static int32_t taosCompareTaosError(const void* a, const void* b) {
static
TdThreadOnce
tsErrorInit
=
PTHREAD_ONCE_INIT
;
static
void
tsSortError
(
void
)
{
qs
ort
(
errors
,
sizeof
(
errors
)
/
sizeof
(
errors
[
0
]),
sizeof
(
errors
[
0
]),
taosCompareTaosError
);
taosS
ort
(
errors
,
sizeof
(
errors
)
/
sizeof
(
errors
[
0
]),
sizeof
(
errors
[
0
]),
taosCompareTaosError
);
}
const
char
*
tstrerror
(
int32_t
err
)
{
...
...
tests/system-test/0-others/udfTest.py
浏览文件 @
e09ddf75
...
...
@@ -313,9 +313,11 @@ class TDTestCase:
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select udf1(num1) , bottom(num1,1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
# stable
...
...
@@ -340,8 +342,10 @@ class TDTestCase:
tdSql
.
query
(
"select ceil(c1) , bottom(c1,1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
error
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
query
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
# regular table with compute functions
...
...
tests/system-test/0-others/udf_create.py
浏览文件 @
e09ddf75
...
...
@@ -315,9 +315,11 @@ class TDTestCase:
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select udf1(num1) , bottom(num1,1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
# stable
...
...
@@ -342,8 +344,10 @@ class TDTestCase:
tdSql
.
query
(
"select ceil(c1) , bottom(c1,1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
error
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
query
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
# regular table with compute functions
...
...
tests/system-test/0-others/udf_restart_taosd.py
浏览文件 @
e09ddf75
...
...
@@ -312,9 +312,11 @@ class TDTestCase:
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select udf1(num1) , bottom(num1,1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select udf1(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
query
(
"select round(num1) , last_row(num1) from tb;"
)
tdSql
.
checkRows
(
1
)
# stable
...
...
@@ -339,8 +341,10 @@ class TDTestCase:
tdSql
.
query
(
"select ceil(c1) , bottom(c1,1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
error
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
error
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
query
(
"select udf1(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select ceil(c1) , last_row(c1) from stb1;"
)
tdSql
.
checkRows
(
1
)
# regular table with compute functions
...
...
tests/system-test/1-insert/test_stmt_insert_query_ex.py
浏览文件 @
e09ddf75
...
...
@@ -184,17 +184,17 @@ class TDTestCase:
#query: string Functions
querystmt
3
=
conn
.
statement
(
"select CHAR_LENGTH(?) from log "
)
queryparam
3
=
new_bind_params
(
1
)
print
(
type
(
queryparam
3
))
queryparam
3
[
0
].
binary
(
'中文字符'
)
querystmt
3
.
bind_param
(
queryparam3
)
querystmt
3
.
execute
()
result
3
=
querystmt3
.
use_result
()
rows
3
=
result3
.
fetch_all
()
print
(
"
4"
,
rows3
)
assert
rows
3
[
0
][
0
]
==
12
,
'fourth case is failed'
assert
rows
3
[
1
][
0
]
==
12
,
'fourth case is failed'
querystmt
9
=
conn
.
statement
(
"select CHAR_LENGTH(?) from log "
)
queryparam
9
=
new_bind_params
(
1
)
print
(
type
(
queryparam
9
))
queryparam
9
[
0
].
binary
(
'中文字符'
)
querystmt
9
.
bind_param
(
queryparam9
)
querystmt
9
.
execute
()
result
9
=
querystmt9
.
use_result
()
rows
9
=
result9
.
fetch_all
()
print
(
"
9"
,
rows9
)
assert
rows
9
[
0
][
0
]
==
12
,
'fourth case is failed'
assert
rows
9
[
1
][
0
]
==
12
,
'fourth case is failed'
# #query: conversion Functions
...
...
tests/system-test/7-tmq/tmqConsFromTsdb.py
浏览文件 @
e09ddf75
...
...
@@ -21,10 +21,10 @@ class TDTestCase:
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db
1
'
,
paraDict
=
{
'dbName'
:
'db
t
'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
...
...
@@ -35,7 +35,7 @@ class TDTestCase:
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
...
...
@@ -43,7 +43,7 @@ class TDTestCase:
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
]
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
...
...
@@ -52,13 +52,12 @@ class TDTestCase:
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdDnodes
.
stop
(
1
)
time
.
sleep
(
2
)
tdDnodes
.
start
(
1
)
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as stable %s"
%
(
topicNameList
[
0
],
paraDict
[
'stbName'
])
#
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
#
sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
...
...
@@ -71,31 +70,219 @@ class TDTestCase:
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:
false, auto.commit.interval.ms:6
000, auto.offset.reset:earliest'
keyList
=
'group.id:cgrp1, enable.auto.commit:
true, auto.commit.interval.ms:1
000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"
0 tmq consume rows error!"
)
tdLog
.
exit
(
"
%d tmq consume rows error!"
%
consumerId
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
totalRowsInserted
=
expectRowsList
[
0
]
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
1
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
(
paraDict
[
"ctbNum"
]
-
7
)
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 0"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
not
(
expectrowcnt
<=
resultList
[
0
]
and
totalRowsInserted
>=
resultList
[
0
]):
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
resultList
[
0
],
expectrowcnt
,
totalRowsInserted
))
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
firstConsumeRows
=
resultList
[
0
]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
consumerId
=
2
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
(
paraDict
[
"ctbNum"
]
-
3
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 1"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeTotalRows
=
firstConsumeRows
+
resultList
[
0
]
if
not
(
expectrowcnt
>=
resultList
[
0
]
and
totalRowsInserted
==
actConsumeTotalRows
):
tdLog
.
info
(
"act consume rows: %d, expect consume rows <= %d "
%
(
resultList
[
0
],
expectrowcnt
))
tdLog
.
info
(
"and sum of two consume rows: %d , total inserted rows: %d"
%
(
actConsumeTotalRows
,
totalRowsInserted
))
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
tmqCase3
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 3: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
totalRowsInserted
=
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
3
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
(
paraDict
[
"ctbNum"
]
-
7
)
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 0"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
not
(
expectrowcnt
<=
resultList
[
0
]
and
totalRowsInserted
>=
resultList
[
0
]):
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
resultList
[
0
],
expectrowcnt
,
totalRowsInserted
))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
firstConsumeRows
=
resultList
[
0
]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
consumerId
=
4
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
(
paraDict
[
"ctbNum"
]
-
3
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 1"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeTotalRows
=
firstConsumeRows
+
resultList
[
0
]
if
not
(
expectrowcnt
>=
resultList
[
0
]
and
totalRowsInserted
==
actConsumeTotalRows
):
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
resultList
[
0
],
expectrowcnt
,
totalRowsInserted
))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 3 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录