Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
69ea49c4
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
69ea49c4
编写于
11月 10, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-20052
上级
98b77fe8
b067079d
变更
45
展开全部
隐藏空白更改
内联
并排
Showing
45 changed file
with
1322 addition
and
1227 deletion
+1322
-1227
cmake/cmake.version
cmake/cmake.version
+8
-7
include/common/tdatablock.h
include/common/tdatablock.h
+1
-1
include/util/tdef.h
include/util/tdef.h
+1
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+1
-2
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+15
-16
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+1
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+10
-2
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+1
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+6
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+38
-6
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+1
-3
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+0
-25
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+1
-11
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+6
-0
source/libs/command/src/command.c
source/libs/command/src/command.c
+1
-2
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+1
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+7
-5
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+5
-10
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+638
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+21
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+22
-641
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+9
-24
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+5
-10
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+9
-19
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+27
-70
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+10
-27
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+4
-10
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+52
-97
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+72
-13
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+2
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+32
-4
source/libs/parser/test/parSelectTest.cpp
source/libs/parser/test/parSelectTest.cpp
+2
-0
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+7
-5
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+2
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+9
-0
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+6
-0
source/util/src/tlog.c
source/util/src/tlog.c
+5
-5
tests/script/tsim/db/alter_option.sim
tests/script/tsim/db/alter_option.sim
+2
-2
tests/script/tsim/db/create_all_options.sim
tests/script/tsim/db/create_all_options.sim
+2
-2
tests/script/tsim/parser/having_child.sim
tests/script/tsim/parser/having_child.sim
+1
-0
tests/script/tsim/stream/basic1.sim
tests/script/tsim/stream/basic1.sim
+253
-175
tests/system-test/1-insert/alter_database.py
tests/system-test/1-insert/alter_database.py
+18
-11
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+5
-5
未找到文件。
cmake/cmake.version
浏览文件 @
69ea49c4
...
...
@@ -65,13 +65,14 @@ ELSE ()
ENDIF ()
MESSAGE(STATUS "============= compile version parameter information start ============= ")
MESSAGE(STATUS "ver number:" ${TD_VER_NUMBER})
MESSAGE(STATUS "compatible ver number:" ${TD_VER_COMPATIBLE})
MESSAGE(STATUS "communit commit id:" ${TD_VER_GIT})
MESSAGE(STATUS "build date:" ${TD_VER_DATE})
MESSAGE(STATUS "ver type:" ${TD_VER_VERTYPE})
MESSAGE(STATUS "ver cpu:" ${TD_VER_CPUTYPE})
MESSAGE(STATUS "os type:" ${TD_VER_OSTYPE})
MESSAGE(STATUS "version: " ${TD_VER_NUMBER})
MESSAGE(STATUS "compatible: " ${TD_VER_COMPATIBLE})
MESSAGE(STATUS "commit id: " ${TD_VER_GIT})
MESSAGE(STATUS "build date: " ${TD_VER_DATE})
MESSAGE(STATUS "build type: " ${CMAKE_BUILD_TYPE})
MESSAGE(STATUS "type: " ${TD_VER_VERTYPE})
MESSAGE(STATUS "cpu: " ${TD_VER_CPUTYPE})
MESSAGE(STATUS "os: " ${TD_VER_OSTYPE})
MESSAGE(STATUS "============= compile version parameter information end ============= ")
STRING(REPLACE "." "_" TD_LIB_VER_NUMBER ${TD_VER_NUMBER})
include/common/tdatablock.h
浏览文件 @
69ea49c4
...
...
@@ -244,7 +244,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData
createColumnInfoData
(
int16_t
type
,
int32_t
bytes
,
int16_t
colId
);
SColumnInfoData
*
bdGetColumnInfoData
(
const
SSDataBlock
*
pBlock
,
int32_t
index
);
void
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompres
s
);
int32_t
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
numOfCol
s
);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
);
void
blockDebugShowDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
...
...
include/util/tdef.h
浏览文件 @
69ea49c4
...
...
@@ -290,7 +290,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
#define TSDB_DEFAULT_BUFFER_PER_VNODE
9
6
#define TSDB_DEFAULT_BUFFER_PER_VNODE
25
6
#define TSDB_MIN_PAGES_PER_VNODE 64
#define TSDB_MAX_PAGES_PER_VNODE (INT32_MAX - 1)
#define TSDB_DEFAULT_PAGES_PER_VNODE 256
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
69ea49c4
...
...
@@ -442,8 +442,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(
*
pRsp
)
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
(
*
pRsp
)
->
numOfCols
=
htonl
(
SHOW_VARIABLES_RESULT_COLS
);
int32_t
len
=
0
;
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
&
len
,
SHOW_VARIABLES_RESULT_COLS
,
false
);
int32_t
len
=
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
SHOW_VARIABLES_RESULT_COLS
);
ASSERT
(
len
==
rspSize
-
sizeof
(
SRetrieveTableRsp
));
blockDataDestroy
(
pBlock
);
...
...
source/common/src/tdatablock.c
浏览文件 @
69ea49c4
...
...
@@ -2197,7 +2197,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
return
rname
.
ctbShortName
;
}
void
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
)
{
int32_t
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
numOfCols
)
{
int32_t
dataLen
=
0
;
// todo extract method
int32_t
*
version
=
(
int32_t
*
)
data
;
*
version
=
1
;
...
...
@@ -2238,7 +2240,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
int32_t
*
colSizes
=
(
int32_t
*
)
data
;
data
+=
numOfCols
*
sizeof
(
int32_t
);
*
dataLen
=
blockDataGetSerialMetaSize
(
numOfCols
);
dataLen
=
blockDataGetSerialMetaSize
(
numOfCols
);
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
...
...
@@ -2255,26 +2257,23 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
}
data
+=
metaSize
;
(
*
dataLen
)
+=
metaSize
;
dataLen
+=
metaSize
;
if
(
needCompress
)
{
colSizes
[
col
]
=
blockCompressColData
(
pColRes
,
numOfRows
,
data
,
needCompress
);
data
+=
colSizes
[
col
];
(
*
dataLen
)
+=
colSizes
[
col
];
}
else
{
colSizes
[
col
]
=
colDataGetLength
(
pColRes
,
numOfRows
);
(
*
dataLen
)
+=
colSizes
[
col
];
memmove
(
data
,
pColRes
->
pData
,
colSizes
[
col
]);
data
+=
colSizes
[
col
];
}
colSizes
[
col
]
=
colDataGetLength
(
pColRes
,
numOfRows
);
dataLen
+=
colSizes
[
col
];
memmove
(
data
,
pColRes
->
pData
,
colSizes
[
col
]);
data
+=
colSizes
[
col
];
colSizes
[
col
]
=
htonl
(
colSizes
[
col
]);
}
*
actualLen
=
*
dataLen
;
*
actualLen
=
dataLen
;
*
groupId
=
pBlock
->
info
.
groupId
;
ASSERT
(
*
dataLen
>
0
);
uDebug
(
"build data block, actualLen:%d, rows:%d, cols:%d"
,
*
dataLen
,
*
rows
,
*
cols
);
ASSERT
(
dataLen
>
0
);
uDebug
(
"build data block, actualLen:%d, rows:%d, cols:%d"
,
dataLen
,
*
rows
,
*
cols
);
return
dataLen
;
}
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
)
{
...
...
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
69ea49c4
...
...
@@ -307,8 +307,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart
+=
sizeof
(
SSysTableSchema
);
}
int32_t
len
=
0
;
blockEncode
(
pBlock
,
pStart
,
&
len
,
numOfCols
,
false
);
int32_t
len
=
blockEncode
(
pBlock
,
pStart
,
numOfCols
);
pRsp
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
pRsp
->
precision
=
TSDB_TIME_PRECISION_MILLI
;
// millisecond time precision
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
69ea49c4
...
...
@@ -606,7 +606,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
}
else
{
if
(
terrno
==
TSDB_CODE_MND_DB_IN_CREATING
)
{
if
(
mndSetRpcInfoForDbTrans
(
pMnode
,
pReq
,
MND_OPER_CREATE_DB
,
createReq
.
db
)
==
0
)
{
mInfo
(
"db:%s, is creating and response after trans finished"
,
createReq
.
db
);
mInfo
(
"db:%s, is creating and
createdb
response after trans finished"
,
createReq
.
db
);
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
goto
_OVER
;
}
else
{
...
...
@@ -1225,6 +1225,14 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
usedbRsp
.
vgVersion
=
usedbReq
.
vgVersion
;
usedbRsp
.
errCode
=
terrno
;
if
(
terrno
==
TSDB_CODE_MND_DB_IN_CREATING
)
{
if
(
mndSetRpcInfoForDbTrans
(
pMnode
,
pReq
,
MND_OPER_CREATE_DB
,
usedbReq
.
db
)
==
0
)
{
mInfo
(
"db:%s, is creating and usedb response after trans finished"
,
usedbReq
.
db
);
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
goto
_OVER
;
}
}
mError
(
"db:%s, failed to process use db req since %s"
,
usedbReq
.
db
,
terrstr
());
}
else
{
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_USE_DB
,
pDb
)
!=
0
)
{
...
...
@@ -1255,7 +1263,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
pReq
->
info
.
rspLen
=
contLen
;
_OVER:
if
(
code
!=
0
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to process use db req since %s"
,
usedbReq
.
db
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
69ea49c4
...
...
@@ -303,8 +303,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart
+=
sizeof
(
SSysTableSchema
);
}
int32_t
len
=
0
;
blockEncode
(
pBlock
,
pStart
,
&
len
,
pShow
->
pMeta
->
numOfColumns
,
false
);
int32_t
len
=
blockEncode
(
pBlock
,
pStart
,
pShow
->
pMeta
->
numOfColumns
);
}
pRsp
->
numOfRows
=
htonl
(
rowsRead
);
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
69ea49c4
...
...
@@ -938,11 +938,15 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SRpcHandleInfo
*
pInfo
=
taosArrayGet
(
pTrans
->
pRpcArray
,
i
);
if
(
pInfo
->
handle
!=
NULL
)
{
mInfo
(
"trans:%d, send rsp, code:0x%x stage:%s app:%p"
,
pTrans
->
id
,
code
,
mndTransStr
(
pTrans
->
stage
),
pInfo
->
ahandle
);
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
code
=
TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL
;
}
if
(
i
!=
0
&&
code
==
0
)
{
code
=
TSDB_CODE_RPC_REDIRECT
;
}
mInfo
(
"trans:%d, client:%d send rsp, code:0x%x stage:%s app:%p"
,
pTrans
->
id
,
i
,
code
,
mndTransStr
(
pTrans
->
stage
),
pInfo
->
ahandle
);
SRpcMsg
rspMsg
=
{.
code
=
code
,
.
info
=
*
pInfo
};
if
(
pTrans
->
originRpcType
==
TDMT_MND_CREATE_DB
)
{
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
69ea49c4
...
...
@@ -110,7 +110,6 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void
tsdbRowGetColVal
(
TSDBROW
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
int32_t
tsdbRowCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
// SRowIter
void
tRowIterInit
(
SRowIter
*
pIter
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
);
...
...
@@ -210,11 +209,10 @@ void tsdbRefMemTable(SMemTable *pMemTable);
void
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
);
SArray
*
tsdbMemTableGetTbDataArray
(
SMemTable
*
pMemTable
);
// STbDataIter
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
void
*
tsdbTbDataIterDestroy
(
STbDataIter
*
pIter
);
void
tsdbTbDataIterOpen
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
*
pIter
);
TSDBROW
*
tsdbTbDataIterGet
(
STbDataIter
*
pIter
);
bool
tsdbTbDataIterNext
(
STbDataIter
*
pIter
);
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
void
*
tsdbTbDataIterDestroy
(
STbDataIter
*
pIter
);
void
tsdbTbDataIterOpen
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
*
pIter
);
bool
tsdbTbDataIterNext
(
STbDataIter
*
pIter
);
// STbData
int32_t
tsdbGetNRowsInTbData
(
STbData
*
pTbData
);
// tsdbFile.c ==============================================================================================
...
...
@@ -772,6 +770,40 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
return
0
;
}
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
static
FORCE_INLINE
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
tGetI64
(
p
,
&
pRow
->
version
);
pRow
->
pTSRow
=
(
STSRow
*
)(
p
+
n
);
n
+=
pRow
->
pTSRow
->
len
;
return
n
;
}
static
FORCE_INLINE
TSDBROW
*
tsdbTbDataIterGet
(
STbDataIter
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
if
(
pIter
->
pRow
)
{
return
pIter
->
pRow
;
}
if
(
pIter
->
backward
)
{
if
(
pIter
->
pNode
==
pIter
->
pTbData
->
sl
.
pHead
)
{
return
NULL
;
}
}
else
{
if
(
pIter
->
pNode
==
pIter
->
pTbData
->
sl
.
pTail
)
{
return
NULL
;
}
}
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
pIter
->
pRow
=
&
pIter
->
row
;
return
pIter
->
pRow
;
}
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
69ea49c4
...
...
@@ -27,9 +27,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
// TODO enable compress
int32_t
actualLen
=
0
;
blockEncode
(
pBlock
,
pRetrieve
->
data
,
&
actualLen
,
numOfCols
,
false
);
int32_t
actualLen
=
blockEncode
(
pBlock
,
pRetrieve
->
data
,
numOfCols
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
pRsp
->
blockDataLen
,
&
actualLen
);
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
69ea49c4
...
...
@@ -294,31 +294,6 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return
true
;
}
TSDBROW
*
tsdbTbDataIterGet
(
STbDataIter
*
pIter
)
{
// we add here for commit usage
if
(
pIter
==
NULL
)
return
NULL
;
if
(
pIter
->
pRow
)
{
goto
_exit
;
}
if
(
pIter
->
backward
)
{
if
(
pIter
->
pNode
==
pIter
->
pTbData
->
sl
.
pHead
)
{
goto
_exit
;
}
}
else
{
if
(
pIter
->
pNode
==
pIter
->
pTbData
->
sl
.
pTail
)
{
goto
_exit
;
}
}
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
pIter
->
pRow
=
&
pIter
->
row
;
_exit:
return
pIter
->
pRow
;
}
static
int32_t
tsdbMemTableRehash
(
SMemTable
*
pMemTable
)
{
int32_t
code
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
69ea49c4
...
...
@@ -973,7 +973,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t
mid
=
dumpedRows
>>
1u
;
int8_t
*
pts
=
(
int8_t
*
)
pColData
->
pData
;
for
(
int32_t
j
=
0
;
j
<
mid
;
++
j
)
{
int
64
_t
t
=
pts
[
j
];
int
8
_t
t
=
pts
[
j
];
pts
[
j
]
=
pts
[
dumpedRows
-
j
-
1
];
pts
[
dumpedRows
-
j
-
1
]
=
t
;
}
...
...
@@ -998,7 +998,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t
mid
=
dumpedRows
>>
1u
;
int32_t
*
pts
=
(
int32_t
*
)
pColData
->
pData
;
for
(
int32_t
j
=
0
;
j
<
mid
;
++
j
)
{
int
64
_t
t
=
pts
[
j
];
int
32
_t
t
=
pts
[
j
];
pts
[
j
]
=
pts
[
dumpedRows
-
j
-
1
];
pts
[
dumpedRows
-
j
-
1
]
=
t
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
69ea49c4
...
...
@@ -575,16 +575,6 @@ int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
return
n
;
}
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tGetI64
(
p
,
&
pRow
->
version
);
pRow
->
pTSRow
=
(
STSRow
*
)(
p
+
n
);
n
+=
pRow
->
pTSRow
->
len
;
return
n
;
}
int32_t
tsdbRowCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
tsdbKeyCmprFn
(
&
TSDBROW_KEY
((
TSDBROW
*
)
p1
),
&
TSDBROW_KEY
((
TSDBROW
*
)
p2
));
}
...
...
@@ -1053,7 +1043,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
tRowIterInit
(
&
rIter
,
pRow
,
pTSchema
);
pColVal
=
tRowIterNext
(
&
rIter
);
for
(
int32_t
iColData
=
0
;
iColData
<
pBlockData
->
nColData
;
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
)
;
SColData
*
pColData
=
&
((
SColData
*
)
pBlockData
->
aColData
->
pData
)[
iColData
]
;
while
(
pColVal
&&
pColVal
->
cid
<
pColData
->
cid
)
{
pColVal
=
tRowIterNext
(
&
rIter
);
...
...
source/dnode/vnode/src/vnd/vnodeModule.c
浏览文件 @
69ea49c4
...
...
@@ -37,6 +37,12 @@ struct SVnodeGlobal vnodeGlobal;
static
void
*
loop
(
void
*
arg
);
static
tsem_t
canCommit
=
{
0
};
static
void
vnodeInitCommit
()
{
tsem_init
(
&
canCommit
,
0
,
4
);
};
void
vnode_wait_commit
()
{
tsem_wait
(
&
canCommit
);
}
void
vnode_done_commit
()
{
tsem_wait
(
&
canCommit
);
}
int
vnodeInit
(
int
nthreads
)
{
int8_t
init
;
int
ret
;
...
...
source/libs/command/src/command.c
浏览文件 @
69ea49c4
...
...
@@ -39,8 +39,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(
*
pRsp
)
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
(
*
pRsp
)
->
numOfCols
=
htonl
(
numOfCols
);
int32_t
len
=
0
;
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
&
len
,
numOfCols
,
false
);
int32_t
len
=
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
numOfCols
);
ASSERT
(
len
==
rspSize
-
sizeof
(
SRetrieveTableRsp
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/command/src/explain.c
浏览文件 @
69ea49c4
...
...
@@ -1610,8 +1610,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
htonl
(
rowNum
);
int32_t
len
=
0
;
blockEncode
(
pBlock
,
rsp
->
data
,
&
len
,
taosArrayGetSize
(
pBlock
->
pDataBlock
),
0
);
int32_t
len
=
blockEncode
(
pBlock
,
rsp
->
data
,
taosArrayGetSize
(
pBlock
->
pDataBlock
));
ASSERT
(
len
==
rspSize
-
sizeof
(
SRetrieveTableRsp
));
rsp
->
compLen
=
htonl
(
len
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
69ea49c4
...
...
@@ -842,10 +842,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_explain_fn_t
explain
);
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_explain_fn_t
explain
);
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
...
...
@@ -881,7 +879,11 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
setOperatorInfo
(
SOperatorInfo
*
pOperator
,
const
char
*
name
,
int32_t
type
,
bool
blocking
,
int32_t
status
,
void
*
pInfo
,
SExecTaskInfo
*
pTaskInfo
);
void
doFilter
(
SSDataBlock
*
pBlock
,
SFilterInfo
*
pFilterInfo
,
SColMatchInfo
*
pColMatchInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
const
SExprInfo
*
pExpr
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
,
STableMetaCacheInfo
*
pCache
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
69ea49c4
...
...
@@ -93,16 +93,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p
->
pCtx
=
createSqlFunctionCtx
(
p
->
pExprInfo
,
p
->
numOfExprs
,
&
p
->
rowEntryInfoOffset
);
}
pOperator
->
name
=
"LastrowScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"CachedRowScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doScanCache
,
NULL
,
NULL
,
destroyLastrowScanOperator
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doScanCache
,
NULL
,
destroyLastrowScanOperator
,
NULL
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
...
...
@@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
uint64_t
suid
=
tableListGetSuid
(
pTableList
);
int32_t
size
=
tableListGetSize
(
pTableList
);
if
(
size
==
0
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo
->
indexOfBufferedRes
+=
1
;
return
pRes
;
}
else
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
else
{
...
...
@@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
69ea49c4
...
...
@@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry
->
dataLen
=
0
;
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
blockEncode
(
pInput
->
pData
,
pEntry
->
data
,
&
pEntry
->
dataLen
,
numOfCols
,
pEntry
->
compressed
);
pEntry
->
dataLen
=
blockEncode
(
pInput
->
pData
,
pEntry
->
data
,
numOfCols
);
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
...
...
source/libs/executor/src/exchangeoperator.c
0 → 100644
浏览文件 @
69ea49c4
此差异已折叠。
点击以展开。
source/libs/executor/src/executor.c
浏览文件 @
69ea49c4
...
...
@@ -1106,3 +1106,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
return
0
;
}
void
qProcessRspMsg
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
info
.
ahandle
;
assert
(
pMsg
->
info
.
ahandle
!=
NULL
);
SDataBuf
buf
=
{.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
};
if
(
pMsg
->
contLen
>
0
)
{
buf
.
pData
=
taosMemoryCalloc
(
1
,
pMsg
->
contLen
);
if
(
buf
.
pData
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
pMsg
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
memcpy
(
buf
.
pData
,
pMsg
->
pCont
,
pMsg
->
contLen
);
}
}
pSendInfo
->
fp
(
pSendInfo
->
param
,
&
buf
,
pMsg
->
code
);
rpcFreeCont
(
pMsg
->
pCont
);
destroySendMsgInfo
(
pSendInfo
);
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
69ea49c4
此差异已折叠。
点击以展开。
source/libs/executor/src/groupoperator.c
浏览文件 @
69ea49c4
...
...
@@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
doFilter
(
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
if
(
!
hasRemainResults
(
&
pInfo
->
groupResInfo
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -438,15 +438,10 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"GroupbyAggOperator"
,
0
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
NULL
,
destroyGroupOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
destroyGroupOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data
++
pInfo
->
groupIndex
;
if
(
pInfo
->
groupIndex
>=
taosArrayGetSize
(
pInfo
->
sortedGroupArray
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
clearPartitionOperator
(
pInfo
);
return
NULL
;
}
...
...
@@ -821,17 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto
_error
;
}
pOperator
->
name
=
"PartitionOperator"
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PARTITION
;
setOperatorInfo
(
pOperator
,
"PartitionOperator"
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
hashPartition
,
NULL
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
hashPartition
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -965,7 +955,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pInfo
->
pInputDataBlock
=
NULL
;
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
printDataBlock
(
pBlock
,
"stream partitionby recv"
);
...
...
@@ -1106,15 +1096,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
part
.
pTargets
,
NULL
,
&
numOfCols
);
pOperator
->
name
=
"StreamPartitionOperator"
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
;
setOperatorInfo
(
pOperator
,
"StreamPartitionOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamHashPartition
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamHashPartition
,
NULL
,
destroyStreamPartitionOperatorInfo
,
NULL
);
initParDownStream
(
downstream
,
&
pInfo
->
partitionSup
,
&
pInfo
->
scalarSup
);
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
69ea49c4
...
...
@@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
pInfo
->
pRes
=
pResBlock
;
pOperator
->
name
=
"MergeJoinOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
setOperatorInfo
(
pOperator
,
"MergeJoinOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
extractTimeCondition
(
pInfo
,
pDownstream
,
numOfDownstream
,
pJoinNode
);
...
...
@@ -121,8 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo
->
inputOrder
=
TSDB_ORDER_DESC
;
}
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeJoin
,
NULL
,
NULL
,
destroyMergeJoinOperator
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeJoin
,
NULL
,
destroyMergeJoinOperator
,
NULL
);
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -372,13 +367,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
if
(
leftTs
==
rightTs
)
{
mergeJoinJoinDownstreamTsRanges
(
pOperator
,
leftTs
,
pRes
,
&
nrows
);
}
else
if
(
asc
&&
leftTs
<
rightTs
||
!
asc
&&
leftTs
>
rightTs
)
{
}
else
if
(
(
asc
&&
leftTs
<
rightTs
)
||
(
!
asc
&&
leftTs
>
rightTs
)
)
{
pJoinInfo
->
leftPos
+=
1
;
if
(
pJoinInfo
->
leftPos
>=
pJoinInfo
->
pLeft
->
info
.
rows
)
{
continue
;
}
}
else
if
(
asc
&&
leftTs
>
rightTs
||
!
asc
&&
leftTs
<
rightTs
)
{
}
else
if
(
(
asc
&&
leftTs
>
rightTs
)
||
(
!
asc
&&
leftTs
<
rightTs
)
)
{
pJoinInfo
->
rightPos
+=
1
;
if
(
pJoinInfo
->
rightPos
>=
pJoinInfo
->
pRight
->
info
.
rows
)
{
continue
;
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
69ea49c4
...
...
@@ -98,13 +98,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
}
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pOperator
->
exprSupp
.
pCtx
,
numOfCols
);
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PROJECT
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doProjectOperation
,
NULL
,
NULL
,
setOperatorInfo
(
pOperator
,
"ProjectOperator"
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doProjectOperation
,
NULL
,
destroyProjectOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
PROJECT_RETRIEVE_DONE
;
}
...
...
@@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
// TODO: optimize it later when partition by + limit
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
}
...
...
@@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
qDebug
(
"set op close, exec %d, status %d rows %d"
,
pTaskInfo
->
execModel
,
pOperator
->
status
,
pFinalRes
->
info
.
rows
);
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
)
{
...
...
@@ -400,14 +396,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
pOperator
->
name
=
"IndefinitOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doApplyIndefinitFunction
,
NULL
,
NULL
,
destroyIndefinitOperatorInfo
,
NULL
);
setOperatorInfo
(
pOperator
,
"IndefinitOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doApplyIndefinitFunction
,
NULL
,
destroyIndefinitOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -499,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -628,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
if
(
pOperator
->
cost
.
openCost
==
0
)
{
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
69ea49c4
...
...
@@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
else
{
// scan table group by group sequentially
if
(
pInfo
->
currentGroupId
==
-
1
)
{
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pTaskInfo
->
pTableInfoList
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pTaskInfo
->
pTableInfoList
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
result
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
...
...
@@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo
->
currentGroupId
=
-
1
;
pInfo
->
assignBlockUid
=
pTableScanNode
->
assignBlockUid
;
pOperator
->
name
=
"TableScanOperator"
;
// for debug purpose
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"TableScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
metaCache
.
pTableMetaEntryCache
=
taosLRUCacheInit
(
1024
*
128
,
-
1
,
.
5
);
if
(
pInfo
->
metaCache
.
pTableMetaEntryCache
==
NULL
)
{
...
...
@@ -962,7 +957,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
taosLRUCacheSetStrictCapacity
(
pInfo
->
metaCache
.
pTableMetaEntryCache
,
false
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
destroyTableScanOperatorInfo
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
destroyTableScanOperatorInfo
,
getTableScannerExecInfo
);
// for non-blocking operator, the open cost is always 0
...
...
@@ -986,14 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo
->
dataReader
=
pReadHandle
;
// pInfo->prevGroupId = -1;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
NULL
,
NULL
);
setOperatorInfo
(
pOperator
,
"TableSeqScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
}
...
...
@@ -1148,15 +1137,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto
_error
;
}
pOperator
->
name
=
"DataBlockDistScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
);
setOperatorInfo
(
pOperator
,
"DataBlockDistScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
);
return
pOperator
;
_error:
...
...
@@ -2368,11 +2350,9 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo
->
vnode
=
pHandle
->
vnode
;
pInfo
->
sContext
=
pHandle
->
sContext
;
pOperator
->
name
=
"RawScanOperator"
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"RawScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doRawScan
,
NULL
,
NULL
,
destroyRawScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doRawScan
,
NULL
,
destroyRawScanOperatorInfo
,
NULL
);
return
pOperator
;
_end:
...
...
@@ -2556,16 +2536,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
assignBlockUid
=
pTableScanNode
->
assignBlockUid
;
pInfo
->
partitionSup
.
needCalc
=
false
;
pOperator
->
name
=
"StreamScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"StreamScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
__optr_fn_t
nextFn
=
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
?
doStreamScan
:
doQueueScan
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
nextFn
,
NULL
,
NULL
,
destroyStreamScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
nextFn
,
NULL
,
destroyStreamScanOperatorInfo
,
NULL
);
return
pOperator
;
...
...
@@ -2900,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
blockDataDestroy
(
dataBlock
);
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
(
pInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -2953,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
if
(
ret
!=
0
)
{
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
...
...
@@ -3743,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
}
if
(
i
>=
taosArrayGetSize
(
pIdx
->
uids
))
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
else
{
pIdx
->
lastIdx
=
i
+
1
;
}
...
...
@@ -3925,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
if
(
ret
!=
0
)
{
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
...
...
@@ -3947,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
doFilterResult
(
pInfo
->
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
);
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
(
pInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
{
if
(
pInfo
->
showRewrite
==
false
)
{
...
...
@@ -4199,15 +4174,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo
->
readHandle
=
*
(
SReadHandle
*
)
readHandle
;
}
pOperator
->
name
=
"SysTableScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"SysTableScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSysTableScan
,
NULL
,
NULL
,
destroySysScanOperator
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
NULL
);
return
pOperator
;
_error:
...
...
@@ -4283,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
count
+=
1
;
if
(
++
pInfo
->
curPos
>=
size
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
}
...
...
@@ -4335,18 +4304,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"TagScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTagScan
,
NULL
,
NULL
,
destroyTagScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTagScan
,
NULL
,
destroyTagScanOperatorInfo
,
NULL
);
return
pOperator
;
...
...
@@ -4713,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo
->
hasGroupId
=
true
;
if
(
tableListSize
==
0
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
pInfo
->
tableStartIndex
=
0
;
...
...
@@ -4732,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
}
else
{
stopGroupTableMergeScan
(
pOperator
);
if
(
pInfo
->
tableEndIndex
>=
tableListSize
-
1
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
pInfo
->
tableStartIndex
=
pInfo
->
tableEndIndex
+
1
;
...
...
@@ -4853,15 +4815,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t
rowSize
=
pInfo
->
pResBlock
->
info
.
rowSize
;
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
);
pOperator
->
name
=
"TableMergeScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"TableMergeScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableMergeScan
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableMergeScan
,
NULL
,
destroyTableMergeScanOperatorInfo
,
getTableMergeScanExplainExecInfo
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
69ea49c4
...
...
@@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
initLimitInfo
(
pSortNode
->
node
.
pLimit
,
pSortNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pOperator
->
name
=
"SortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"SortOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
...
...
@@ -67,7 +63,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
NULL
,
destroySortOperatorInfo
,
getExplainExecInfo
);
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
destroySortOperatorInfo
,
getExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
pBlock
=
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
matchInfo
.
pList
,
pInfo
);
if
(
pBlock
==
NULL
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
pInfo
->
prefetchedSortInput
=
pOperator
->
pDownstream
[
0
]
->
fpSet
.
getNextFn
(
pOperator
->
pDownstream
[
0
]);
if
(
pInfo
->
prefetchedSortInput
==
NULL
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
pInfo
->
currGroupId
=
pInfo
->
prefetchedSortInput
->
info
.
groupId
;
...
...
@@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
beginSortGroup
(
pOperator
);
}
else
if
(
pInfo
->
childOpStatus
==
CHILD_OP_FINISHED
)
{
finishSortGroup
(
pOperator
);
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
...
...
@@ -509,15 +505,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
}
pInfo
->
pSortInfo
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
);
pOperator
->
name
=
"GroupSortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doGroupSort
,
NULL
,
NULL
,
destroyGroupSortOperatorInfo
,
setOperatorInfo
(
pOperator
,
"GroupSortOperator"
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doGroupSort
,
NULL
,
destroyGroupSortOperatorInfo
,
getGroupSortExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
}
else
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
return
pBlock
;
...
...
@@ -774,14 +763,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
);
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
numStreams
+
1
);
// one additional is reserved for merged result.
pOperator
->
name
=
"MultiwayMerge"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenMultiwayMergeOperator
,
doMultiwayMerge
,
NULL
,
NULL
,
setOperatorInfo
(
pOperator
,
"MultiwayMergeOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenMultiwayMergeOperator
,
doMultiwayMerge
,
NULL
,
destroyMultiwayMergeOperatorInfo
,
getMultiwayMergeExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
downStreams
,
numStreams
);
...
...
source/libs/executor/src/tfill.c
浏览文件 @
69ea49c4
...
...
@@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
return
pInfo
->
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
resetStreamFillInfo
(
pInfo
);
return
NULL
;
}
...
...
@@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
if
(
pInfo
->
pRes
->
info
.
rows
==
0
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
resetStreamFillInfo
(
pInfo
);
return
NULL
;
}
...
...
@@ -1690,15 +1690,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
}
pInfo
->
srcRowIndex
=
0
;
pOperator
->
name
=
"StreamFillOperator"
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"StreamFillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamFill
,
NULL
,
NULL
,
destroyStreamFillOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamFill
,
NULL
,
destroyStreamFillOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
69ea49c4
...
...
@@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pTaskInfo
->
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
ASSERT
(
as
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
...
...
@@ -1777,15 +1776,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"TimeIntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"TimeIntervalAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
NULL
,
destroyIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
destroyIntervalOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
...
...
@@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
...
...
@@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
...
...
@@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
else
{
...
...
@@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
...
...
@@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
}
...
...
@@ -2557,15 +2551,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pScanInfo
->
cond
.
twindows
=
pInfo
->
win
;
pScanInfo
->
cond
.
type
=
TIMEWINDOW_RANGE_EXTERNAL
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
setOperatorInfo
(
pOperator
,
"TimeSliceOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTimeslice
,
NULL
,
NULL
,
destroyTimeSliceOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doTimeslice
,
NULL
,
destroyTimeSliceOperatorInfo
,
NULL
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -2633,15 +2621,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
pInfo
->
tsSlotId
=
tsSlotId
;
pOperator
->
name
=
"StateWindowOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"StateWindowOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
openStateWindowAggOptr
,
doStateWindowAgg
,
NULL
,
NULL
,
destroyStateWindowOperatorInfo
,
NULL
);
createOperatorFpSet
(
openStateWindowAggOptr
,
doStateWindowAgg
,
NULL
,
destroyStateWindowOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2711,14 +2694,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
goto
_error
;
}
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"SessionWindowAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSessionWindowAgg
,
NULL
,
NULL
,
destroySWindowOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doSessionWindowAgg
,
NULL
,
destroySWindowOperatorInfo
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return
pInfo
->
binfo
.
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
if
(
!
IS_FINAL_OP
(
pInfo
))
{
clearFunctionContext
(
&
pOperator
->
exprSupp
);
// semi interval operator clear disk buffer
...
...
@@ -3403,7 +3381,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
NULL
);
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
)
{
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
}
...
...
@@ -4027,7 +4005,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return
pBInfo
->
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -4133,7 +4111,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return
pBInfo
->
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -4200,13 +4178,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo
->
pGroupIdTbNameMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_NO_LOCK
);
pOperator
->
name
=
"StreamSessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
setOperatorInfo
(
pOperator
,
"StreamSessionWindowAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
if
(
downstream
)
{
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
);
...
...
@@ -4257,7 +4233,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext
(
&
pOperator
->
exprSupp
);
// semi interval operator clear disk buffer
clearStreamSessionOperator
(
pInfo
);
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
...
...
@@ -4336,7 +4312,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext
(
&
pOperator
->
exprSupp
);
// semi interval operator clear disk buffer
clearStreamSessionOperator
(
pInfo
);
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -4347,20 +4323,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if
(
pOperator
==
NULL
)
{
goto
_error
;
}
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
)
{
pInfo
->
isFinal
=
true
;
pOperator
->
name
=
"StreamSessionFinalAggOperator"
;
}
else
{
pInfo
->
isFinal
=
false
;
pInfo
->
isFinal
=
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
);
char
*
name
=
(
pInfo
->
isFinal
)
?
"StreamSessionFinalAggOperator"
:
"StreamSessionSemiAggOperator"
;
if
(
pPhyNode
->
type
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
)
{
pInfo
->
pUpdateRes
=
createSpecialDataBlock
(
STREAM_CLEAR
);
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pOperator
->
name
=
"StreamSessionSemiAggOperator"
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
}
setOperatorInfo
(
pOperator
,
name
,
pPhyNode
->
type
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pInfo
->
pGroupIdTbNameMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -4590,7 +4567,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return
pBInfo
->
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -4656,7 +4633,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock
(
pBInfo
->
pRes
,
"single state"
);
return
pBInfo
->
pRes
;
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -4721,14 +4698,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
pGroupIdTbNameMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_NO_LOCK
);
pOperator
->
name
=
"StreamStateAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
pInfo
;
setOperatorInfo
(
pOperator
,
"StreamStateAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
NULL
,
destroyStreamStateOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
destroyStreamStateOperatorInfo
,
NULL
);
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -4876,7 +4848,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
cleanupAfterGroupResultGen
(
pMiaInfo
,
pRes
);
}
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
break
;
}
...
...
@@ -5001,16 +4973,10 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo
(
&
iaInfo
->
binfo
.
resultRowInfo
);
blockDataEnsureCapacity
(
iaInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
name
=
"TimeMergeAlignedIntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
miaInfo
;
setOperatorInfo
(
pOperator
,
"TimeMergeAlignedIntervalAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
,
false
,
OP_NOT_OPENED
,
miaInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
mergeAlignedIntervalAgg
,
NULL
,
NULL
,
destroyMAIOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
mergeAlignedIntervalAgg
,
NULL
,
destroyMAIOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5254,7 +5220,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
if
(
pRes
->
info
.
rows
==
0
)
{
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
}
size_t
rows
=
pRes
->
info
.
rows
;
...
...
@@ -5313,16 +5279,9 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
}
initResultRowInfo
(
&
pIntervalInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"TimeMergeIntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
pMergeIntervalInfo
;
setOperatorInfo
(
pOperator
,
"TimeMergeIntervalAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
,
false
,
OP_NOT_OPENED
,
pMergeIntervalInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeIntervalAgg
,
NULL
,
NULL
,
destroyMergeIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doMergeIntervalAgg
,
NULL
,
destroyMergeIntervalOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5366,7 +5325,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
deleteIntervalDiscBuf
(
pInfo
->
pState
,
NULL
,
pInfo
->
twAggSup
.
maxTs
-
pInfo
->
twAggSup
.
deleteMark
,
&
pInfo
->
interval
,
&
pInfo
->
delKey
);
doS
etOperatorCompleted
(
pOperator
);
s
etOperatorCompleted
(
pOperator
);
streamStateCommit
(
pTaskInfo
->
streamInfo
.
pState
);
return
NULL
;
}
...
...
@@ -5550,12 +5509,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
->
pGroupIdTbNameMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_NO_LOCK
);
pOperator
->
name
=
"StreamIntervalOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
setOperatorInfo
(
pOperator
,
"StreamIntervalOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
69ea49c4
...
...
@@ -3096,27 +3096,86 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
#else
int64_t
*
pts
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pInputCol
->
hasNull
&&
colDataIsNull
(
pInputCol
,
pInput
->
totalRows
,
i
,
pColAgg
))
{
continue
;
#if 0
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
continue;
}
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
pResInfo->numOfRes = 1;
}
}
#else
if
(
!
pInputCol
->
hasNull
)
{
numOfElems
=
1
;
numOfElems
++
;
int32_t
round
=
pInput
->
numOfRows
>>
2
;
int32_t
reminder
=
pInput
->
numOfRows
&
0x03
;
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
pts
[
i
];
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
cts
)
{
doSaveCurrentVal
(
pCtx
,
i
,
cts
,
type
,
data
);
pResInfo
->
numOfRes
=
1
;
int32_t
tick
=
0
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
tick
<
round
;
i
+=
4
,
tick
+=
1
)
{
int64_t
cts
=
pts
[
i
];
int32_t
chosen
=
i
;
if
(
cts
<
pts
[
i
+
1
])
{
cts
=
pts
[
i
+
1
];
chosen
=
i
+
1
;
}
if
(
cts
<
pts
[
i
+
2
])
{
cts
=
pts
[
i
+
2
];
chosen
=
i
+
2
;
}
if
(
cts
<
pts
[
i
+
3
])
{
cts
=
pts
[
i
+
3
];
chosen
=
i
+
3
;
}
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
cts
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
chosen
);
doSaveCurrentVal
(
pCtx
,
i
,
cts
,
type
,
data
);
pResInfo
->
numOfRes
=
1
;
}
}
for
(
int32_t
i
=
pInput
->
startRowIndex
+
round
*
4
;
i
<
pInput
->
startRowIndex
+
pInput
->
numOfRows
;
++
i
)
{
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
pts
[
i
])
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
doSaveCurrentVal
(
pCtx
,
i
,
pts
[
i
],
type
,
data
);
pResInfo
->
numOfRes
=
1
;
}
}
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
startRowIndex
+
pInput
->
numOfRows
;
++
i
)
{
if
(
pInputCol
->
hasNull
&&
colDataIsNull
(
pInputCol
,
pInput
->
totalRows
,
i
,
pColAgg
))
{
continue
;
}
numOfElems
++
;
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
pts
[
i
])
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
doSaveCurrentVal
(
pCtx
,
i
,
pts
[
i
],
type
,
data
);
pResInfo
->
numOfRes
=
1
;
}
}
}
}
#endif
#endif
// save selectivity value for column consisted of all null values
if
(
numOfElems
==
0
)
{
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
}
SET_VAL
(
pResInfo
,
numOfElems
,
1
);
// SET_VAL(pResInfo, numOfElems, 1);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3266,8 +3325,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
#if 0
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// the optimized version only
function
if all tuples in one block are monotonious increasing or descreasing.
// this is NOT always works if project operator exists in downstream.
// the optimized version only
valid
if all tuples in one block are monotonious increasing or descreasing.
// this
assumption
is NOT always works if project operator exists in downstream.
if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i);
...
...
source/libs/function/src/tudf.c
浏览文件 @
69ea49c4
...
...
@@ -131,7 +131,8 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char
udfdPathLdLib
[
1024
]
=
{
0
};
size_t
udfdLdLibPathLen
=
strlen
(
tsUdfdLdLibPath
);
strncpy
(
udfdPathLdLib
,
tsUdfdLdLibPath
,
udfdLdLibPathLen
);
strncpy
(
udfdPathLdLib
,
tsUdfdLdLibPath
,
tListLen
(
udfdPathLdLib
));
udfdPathLdLib
[
udfdLdLibPathLen
]
=
':'
;
strncpy
(
udfdPathLdLib
+
udfdLdLibPathLen
+
1
,
pathTaosdLdLib
,
sizeof
(
udfdPathLdLib
)
-
udfdLdLibPathLen
-
1
);
if
(
udfdLdLibPathLen
+
taosdLdLibPathLen
<
1024
)
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
69ea49c4
...
...
@@ -2393,6 +2393,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
if
(
TSDB_SUPER_TABLE
==
pRealTable
->
pMeta
->
tableType
)
{
pCxt
->
stableQuery
=
true
;
}
if
(
TSDB_SYSTEM_TABLE
==
pRealTable
->
pMeta
->
tableType
&&
isSelectStmt
(
pCxt
->
pCurrStmt
))
{
((
SSelectStmt
*
)
pCxt
->
pCurrStmt
)
->
isTimeLineResult
=
false
;
}
code
=
addNamespace
(
pCxt
,
pRealTable
);
}
break
;
...
...
@@ -3428,6 +3431,19 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
return
(
SNode
*
)
pCol
;
}
// 0 means equal, 1 means the left shall prevail, -1 means the right shall prevail
static
int32_t
dataTypeComp
(
const
SDataType
*
l
,
const
SDataType
*
r
)
{
if
(
l
->
type
!=
r
->
type
)
{
return
1
;
}
if
(
l
->
bytes
!=
r
->
bytes
)
{
return
l
->
bytes
>
r
->
bytes
?
1
:
-
1
;
}
return
(
l
->
precision
==
r
->
precision
&&
l
->
scale
==
r
->
scale
)
?
0
:
1
;
}
static
int32_t
translateSetOperProject
(
STranslateContext
*
pCxt
,
SSetOperator
*
pSetOperator
)
{
SNodeList
*
pLeftProjections
=
getProjectList
(
pSetOperator
->
pLeft
);
SNodeList
*
pRightProjections
=
getProjectList
(
pSetOperator
->
pRight
);
...
...
@@ -3440,7 +3456,8 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
FORBOTH
(
pLeft
,
pLeftProjections
,
pRight
,
pRightProjections
)
{
SExprNode
*
pLeftExpr
=
(
SExprNode
*
)
pLeft
;
SExprNode
*
pRightExpr
=
(
SExprNode
*
)
pRight
;
if
(
!
dataTypeEqual
(
&
pLeftExpr
->
resType
,
&
pRightExpr
->
resType
))
{
int32_t
comp
=
dataTypeComp
(
&
pLeftExpr
->
resType
,
&
pRightExpr
->
resType
);
if
(
comp
>
0
)
{
SNode
*
pRightFunc
=
NULL
;
int32_t
code
=
createCastFunc
(
pCxt
,
pRight
,
pLeftExpr
->
resType
,
&
pRightFunc
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -3448,9 +3465,20 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
}
REPLACE_LIST2_NODE
(
pRightFunc
);
pRightExpr
=
(
SExprNode
*
)
pRightFunc
;
}
strcpy
(
pRightExpr
->
aliasName
,
pLeftExpr
->
aliasName
);
pRightExpr
->
aliasName
[
strlen
(
pLeftExpr
->
aliasName
)]
=
'\0'
;
}
else
if
(
comp
<
0
)
{
SNode
*
pLeftFunc
=
NULL
;
int32_t
code
=
createCastFunc
(
pCxt
,
pLeft
,
pRightExpr
->
resType
,
&
pLeftFunc
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
REPLACE_LIST1_NODE
(
pLeftFunc
);
SExprNode
*
pLeftFuncExpr
=
(
SExprNode
*
)
pLeftFunc
;
snprintf
(
pLeftFuncExpr
->
aliasName
,
sizeof
(
pLeftFuncExpr
->
aliasName
),
"%s"
,
pLeftExpr
->
aliasName
);
snprintf
(
pLeftFuncExpr
->
userAlias
,
sizeof
(
pLeftFuncExpr
->
userAlias
),
"%s"
,
pLeftExpr
->
userAlias
);
pLeft
=
pLeftFunc
;
pLeftExpr
=
pLeftFuncExpr
;
}
snprintf
(
pRightExpr
->
aliasName
,
sizeof
(
pRightExpr
->
aliasName
),
"%s"
,
pLeftExpr
->
aliasName
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pSetOperator
->
pProjectionList
,
createSetOperProject
(
pSetOperator
->
stmtName
,
pLeft
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/libs/parser/test/parSelectTest.cpp
浏览文件 @
69ea49c4
...
...
@@ -425,6 +425,8 @@ TEST_F(ParserSelectTest, informationSchema) {
run
(
"SELECT * FROM ins_databases WHERE name = 'information_schema'"
);
run
(
"SELECT * FROM ins_tags WHERE db_name = 'test' and table_name = 'st1'"
);
run
(
"SELECT * FROM (SELECT table_name FROM ins_tables) t WHERE table_name = 'a'"
);
}
TEST_F
(
ParserSelectTest
,
withoutFrom
)
{
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
69ea49c4
...
...
@@ -1028,11 +1028,11 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
bool
tzPresent
=
(
inputNum
==
2
)
?
true
:
false
;
char
*
tz
;
int32_t
tzLen
;
char
tz
[
20
]
=
{
0
}
;
int32_t
tzLen
=
0
;
if
(
tzPresent
)
{
tz
=
varDataVal
(
pInput
[
1
].
columnData
->
pData
);
tzLen
=
varDataLen
(
pInput
[
1
].
columnData
->
pData
);
memcpy
(
tz
,
varDataVal
(
pInput
[
1
].
columnData
->
pData
),
tzLen
);
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
...
...
@@ -1071,8 +1071,10 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t
len
=
(
int32_t
)
strlen
(
buf
);
// add timezone string
snprintf
(
buf
+
len
,
tzLen
+
1
,
"%s"
,
tz
);
len
+=
tzLen
;
if
(
tzLen
>
0
)
{
snprintf
(
buf
+
len
,
tzLen
+
1
,
"%s"
,
tz
);
len
+=
tzLen
;
}
if
(
hasFraction
)
{
int32_t
fracLen
=
(
int32_t
)
strlen
(
fraction
)
+
1
;
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
69ea49c4
...
...
@@ -118,8 +118,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve
->
ekey
=
htobe64
(
pBlock
->
info
.
window
.
ekey
);
pRetrieve
->
version
=
htobe64
(
pBlock
->
info
.
version
);
int32_t
actualLen
=
0
;
blockEncode
(
pBlock
,
pRetrieve
->
data
,
&
actualLen
,
numOfCols
,
false
);
int32_t
actualLen
=
blockEncode
(
pBlock
,
pRetrieve
->
data
,
numOfCols
);
SStreamRetrieveReq
req
=
{
.
streamId
=
pTask
->
streamId
,
...
...
@@ -200,8 +199,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pBlock
->
pDataBlock
);
pRetrieve
->
numOfCols
=
htonl
(
numOfCols
);
int32_t
actualLen
=
0
;
blockEncode
(
pBlock
,
pRetrieve
->
data
,
&
actualLen
,
numOfCols
,
false
);
int32_t
actualLen
=
blockEncode
(
pBlock
,
pRetrieve
->
data
,
numOfCols
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
pReq
->
dataLen
,
&
actualLen
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
69ea49c4
...
...
@@ -1036,6 +1036,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
ASSERT
(
ret
==
0
);
pSyncNode
->
pRaftStore
=
NULL
;
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
pSyncNode
->
pSyncRespMgr
=
NULL
;
...
...
@@ -1931,10 +1932,18 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SSyncNode
*
pSyncNode
=
pData
->
pSyncNode
;
SSyncTimer
*
pSyncTimer
=
pData
->
pTimer
;
if
(
pSyncNode
==
NULL
)
{
return
;
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
;
}
if
(
pSyncNode
->
pRaftStore
==
NULL
)
{
return
;
}
// sNTrace(pSyncNode, "eq peer hb timer");
int64_t
timerLogicClock
=
atomic_load_64
(
&
pSyncTimer
->
logicClock
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
69ea49c4
...
...
@@ -294,6 +294,12 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return
0
;
}
// need not truncate
SyncIndex
walCommitVer
=
walGetCommittedVer
(
pWal
);
if
(
fromIndex
<=
walCommitVer
)
{
return
0
;
}
int32_t
code
=
walRollback
(
pWal
,
fromIndex
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
...
...
source/util/src/tlog.c
浏览文件 @
69ea49c4
...
...
@@ -317,14 +317,14 @@ static void taosGetLogFileName(char *fn) {
for
(
int32_t
i
=
0
;
i
<
tsLogObj
.
fileNum
;
i
++
)
{
char
fileName
[
LOG_FILE_NAME_LEN
];
s
printf
(
fileName
,
"%s%d.0"
,
fn
,
i
);
s
nprintf
(
fileName
,
LOG_FILE_NAME_LEN
,
"%s%d.0"
,
fn
,
i
);
bool
file1open
=
taosCheckFileIsOpen
(
fileName
);
s
printf
(
fileName
,
"%s%d.1"
,
fn
,
i
);
s
nprintf
(
fileName
,
LOG_FILE_NAME_LEN
,
"%s%d.1"
,
fn
,
i
);
bool
file2open
=
taosCheckFileIsOpen
(
fileName
);
if
(
!
file1open
&&
!
file2open
)
{
s
printf
(
tsLogObj
.
logName
,
"%s%d"
,
fn
,
i
);
s
nprintf
(
tsLogObj
.
logName
,
LOG_FILE_NAME_LEN
,
"%s%d"
,
fn
,
i
);
return
;
}
}
...
...
@@ -586,7 +586,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
int32_t
end
=
0
;
int32_t
remainSize
=
0
;
static
int64_t
lostLine
=
0
;
char
tmpBuf
[
40
]
=
{
0
};
char
tmpBuf
[
128
]
=
{
0
};
int32_t
tmpBufLen
=
0
;
if
(
pLogBuf
==
NULL
||
pLogBuf
->
stop
)
return
-
1
;
...
...
@@ -598,7 +598,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
remainSize
=
(
start
>
end
)
?
(
start
-
end
-
1
)
:
(
start
+
LOG_BUF_SIZE
(
pLogBuf
)
-
end
-
1
);
if
(
lostLine
>
0
)
{
s
printf
(
tmpBuf
,
"...Lost %"
PRId64
" lines here...
\n
"
,
lostLine
);
s
nprintf
(
tmpBuf
,
tListLen
(
tmpBuf
)
,
"...Lost %"
PRId64
" lines here...
\n
"
,
lostLine
);
tmpBufLen
=
(
int32_t
)
strlen
(
tmpBuf
);
}
...
...
tests/script/tsim/db/alter_option.sim
浏览文件 @
69ea49c4
...
...
@@ -38,7 +38,7 @@ endi
print ============= create database
#database_option: {
# | BUFFER value [3~16384, default:
9
6]
# | BUFFER value [3~16384, default:
25
6]
# | PAGES value [64~16384, default: 256]
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both']
# | WAL_FSYNC_PERIOD value [0 ~ 180000 ms]
...
...
@@ -78,7 +78,7 @@ endi
if $data7_db != 1440000m,1440000m,1440000m then # keep
return -1
endi
if $data8_db !=
9
6 then # buffer
if $data8_db !=
25
6 then # buffer
return -1
endi
if $data9_db != 4 then # pagesize
...
...
tests/script/tsim/db/create_all_options.sim
浏览文件 @
69ea49c4
...
...
@@ -37,7 +37,7 @@ endi
print ============= create database with all options
#database_option: {
# | BUFFER value [3~16384, default:
9
6]
# | BUFFER value [3~16384, default:
25
6]
# | PAGES value [64~16384, default: 256]
# | PAGESIZE value [1~16384, default: 4]
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both', default: 'node']
...
...
@@ -98,7 +98,7 @@ endi
if $data7_db != 5256000m,5256000m,5256000m then # keep
return -1
endi
if $data8_db !=
9
6 then # buffer
if $data8_db !=
25
6 then # buffer
return -1
endi
if $data9_db != 4 then # pagesize
...
...
tests/script/tsim/parser/having_child.sim
浏览文件 @
69ea49c4
...
...
@@ -733,6 +733,7 @@ sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1
sql select avg(f1),count(tb1.*),sum(f1),stddev(f1),LEASTSQUARES(f1,1,1) from tb1 group by f1 having sum(f1) > 2 order by f1;
if $rows != 3 then
print expect 3 , actual: $rows
return -1
endi
if $data00 != 2.000000000 then
...
...
tests/script/tsim/stream/basic1.sim
浏览文件 @
69ea49c4
...
...
@@ -23,443 +23,516 @@ sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
$loop_count = 0
loop0:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
print =====
data01
=$data01
goto loop0
endi
if $data02 != 2 then
print ======$data02
return -1
print =====
data02
=$data02
goto loop0
endi
if $data03 != 5 then
print ======$data03
return -1
print =====
data03
=$data03
goto loop0
endi
if $data04 != 2 then
print ======$data04
return -1
print =====
data04
=$data04
goto loop0
endi
if $data05 != 3 then
print ======$data05
return -1
print =====
data05
=$data05
goto loop0
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
print =====
data11
=$data11
goto loop0
endi
if $data12 != 1 then
print ======$data12
return -1
print =====
data12
=$data12
goto loop0
endi
if $data13 != 2 then
print ======$data13
return -1
print =====
data13
=$data13
goto loop0
endi
if $data14 != 2 then
print ======$data14
return -1
print =====
data14
=$data14
goto loop0
endi
if $data15 != 3 then
print ======$data15
return -1
print =====
data15
=$data15
goto loop0
endi
# row 2
if $data21 != 1 then
print ======$data21
print =====
data21
=$data21
return -1
endi
if $data22 != 1 then
print ======$data22
print =====
data22
=$data22
return -1
endi
if $data23 != 3 then
print ======$data23
print =====
data23
=$data23
return -1
endi
if $data24 != 2 then
print ======$data24
print =====
data24
=$data24
return -1
endi
if $data25 != 3 then
print ======$data25
print =====
data25
=$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
print =====
data31
=$data31
return -1
endi
if $data32 != 1 then
print ======$data32
print =====
data32
=$data32
return -1
endi
if $data33 != 4 then
print ======$data33
print =====
data33
=$data33
return -1
endi
if $data34 != 2 then
print ======$data34
print =====
data34
=$data34
return -1
endi
if $data35 != 3 then
print ======$data35
print =====
data35
=$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
$loop_count = 0
loop1:
sleep 200
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
return -
1
goto loop
1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -
1
print =====
data01
=$data01
goto loop
1
endi
if $data02 != 2 then
print ======$data02
return -
1
print =====
data02
=$data02
goto loop
1
endi
if $data03 != 5 then
print ======$data03
return -
1
print =====
data03
=$data03
goto loop
1
endi
if $data04 != 2 then
print ======$data04
return -
1
print =====
data04
=$data04
goto loop
1
endi
if $data05 != 3 then
print ======$data05
return -
1
print =====
data05
=$data05
goto loop
1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -
1
print =====
data11
=$data11
goto loop
1
endi
if $data12 != 1 then
print ======$data12
return -
1
print =====
data12
=$data12
goto loop
1
endi
if $data13 != 12 then
print ======$data13
return -
1
print =====
data13
=$data13
goto loop
1
endi
if $data14 != 14 then
print ======$data14
return -
1
print =====
data14
=$data14
goto loop
1
endi
if $data15 != 13 then
print ======$data15
return -
1
print =====
data15
=$data15
goto loop
1
endi
# row 2
if $data21 != 1 then
print ======$data21
print =====
data21
=$data21
return -1
endi
if $data22 != 1 then
print ======$data22
print =====
data22
=$data22
return -1
endi
if $data23 != 3 then
print ======$data23
print =====
data23
=$data23
return -1
endi
if $data24 != 2 then
print ======$data24
print =====
data24
=$data24
return -1
endi
if $data25 != 3 then
print ======$data25
print =====
data25
=$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
print =====
data31
=$data31
return -1
endi
if $data32 != 1 then
print ======$data32
print =====
data32
=$data32
return -1
endi
if $data33 != 4 then
print ======$data33
print =====
data33
=$data33
return -1
endi
if $data34 != 2 then
print ======$data34
print =====
data34
=$data34
return -1
endi
if $data35 != 3 then
print ======$data35
print =====
data35
=$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
$loop_count = 0
loop2:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 1
if $data11 != 2 then
print ======$data11
return -1
print =====
data11
=$data11
goto loop2
endi
if $data12 != 2 then
print ======$data12
return -1
print =====
data12
=$data12
goto loop2
endi
if $data13 != 24 then
print ======$data13
return -1
print =====
data13
=$data13
goto loop2
endi
if $data14 != 14 then
print ======$data14
return -1
print =====
data14
=$data14
goto loop2
endi
if $data15 != 13 then
print ======$data15
return -1
print =====
data15
=$data15
goto loop2
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
$loop_count = 0
loop3:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 1
if $data11 != 3 then
print ======$data11
return -1
print =====
data11
=$data11
goto loop3
endi
if $data12 != 3 then
print ======$data12
return -1
print =====
data12
=$data12
goto loop3
endi
if $data13 != 36 then
print ======$data13
return -1
print =====
data13
=$data13
goto loop3
endi
if $data14 != 14 then
print ======$data14
return -1
print =====
data14
=$data14
goto loop3
endi
if $data15 != 13 then
print ======$data15
return -1
print =====
data15
=$data15
goto loop3
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
$loop_count = 0
loop4:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 1
if $data11 != 3 then
print ======$data11
return -1
print =====
data11
=$data11
goto loop4
endi
if $data12 != 3 then
print ======$data12
return -1
print =====
data12
=$data12
goto loop4
endi
if $data13 != 6 then
print ======$data13
return -1
print =====
data13
=$data13
goto loop4
endi
if $data14 != 3 then
print ======$data14
return -1
print =====
data14
=$data14
goto loop4
endi
if $data15 != 1 then
print ======$data15
return -1
print =====
data15
=$data15
goto loop4
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
$loop_count = 0
loop5:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 2
if $data21 != 2 then
print ======$data21
return -1
print =====
data21
=$data21
goto loop5
endi
if $data22 != 2 then
print ======$data22
return -1
print =====
data22
=$data22
goto loop5
endi
if $data23 != 6 then
print ======$data23
return -1
print =====
data23
=$data23
goto loop5
endi
if $data24 != 2 then
print ======$data24
return -1
print =====
data24
=$data24
goto loop5
endi
if $data25 != 3 then
print ======$data25
return -1
print =====
data25
=$data25
goto loop5
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
$loop_count = 0
loop6:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 0
if $data01 != 4 then
print ======$data01
return -1
print =====
data01
=$data01
goto loop6
endi
if $data02 != 4 then
print ======$data02
return -1
print =====
data02
=$data02
goto loop6
endi
if $data03 != 50 then
print ======$data03 != 50
return -1
print =====
data03
=$data03 != 50
goto loop6
endi
if $data04 != 20 then
print ======$data04 != 20
return -1
print =====
data04
=$data04 != 20
goto loop6
endi
if $data05 != 3 then
print ======$data05
return -1
print =====
data05
=$data05
goto loop6
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
$loop_count = 0
loop7:
sleep 200
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
# row 1
if $data11 != 4 then
print ======$data11
return -1
print =====
data11
=$data11
goto loop7
endi
if $data12 != 4 then
print ======$data12
return -1
print =====
data12
=$data12
goto loop7
endi
if $data13 != 46 then
print ======$data13 != 46
return -1
print =====
data13
=$data13 != 46
goto loop7
endi
if $data14 != 20 then
print ======$data14 != 20
return -1
print =====
data14
=$data14 != 20
goto loop7
endi
if $data15 != 1 then
print ======$data15
return -1
print =====
data15
=$data15
goto loop7
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
print =====
data21
=$data21
goto loop7
endi
if $data22 != 4 then
print ======$data22
return -1
print =====
data22
=$data22
goto loop7
endi
if $data23 != 15 then
print ======$data23
return -1
print =====
data23
=$data23
goto loop7
endi
if $data24 != 4 then
print ======$data24
return -1
print =====
data24
=$data24
goto loop7
endi
if $data25 != 3 then
print ======$data25
return -1
print =====
data25
=$data25
goto loop7
endi
sql create database test2 vgroups 1;
...
...
@@ -479,11 +552,11 @@ sql insert into t1 values(1648791213000,1,1,1,1.0) t2 values(1648791213000,2,2,2
$loop_count = 0
loop
0
:
sleep
3
00
loop
8
:
sleep
2
00
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -491,7 +564,7 @@ sql select * from streamt;
if $rows != 4 then
print =====rows=$rows
goto loop
0
goto loop
8
endi
sql insert into t1 values(1648791213000,5,5,5,5.0) t2 values(1648791213000,6,6,6,6.0) t5 values(1648791213000,7,7,7,7.0);
...
...
@@ -499,11 +572,11 @@ sql insert into t1 values(1648791213000,5,5,5,5.0) t2 values(1648791213000,6,6,6
$loop_count = 0
loop
1
:
sleep
3
00
loop
9
:
sleep
2
00
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -511,51 +584,51 @@ sql select * from streamt order by c4 desc;
if $rows != 5 then
print =====rows=$rows
goto loop
1
goto loop
9
endi
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop
1
goto loop
9
endi
if $data02 != 7 then
print =====data02=$data02
goto loop
1
goto loop
9
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop
1
goto loop
9
endi
if $data12 != 6 then
print =====data12=$data12
goto loop
1
goto loop
9
endi
# row 2
if $data21 != 1 then
print =====data21=$data21
goto loop
1
goto loop
9
endi
if $data22 != 5 then
print =====data22=$data22
goto loop
1
goto loop
9
endi
sql insert into t1 values(1648791213000,8,8,8,8.0);
$loop_count = 0
loop
2
:
sleep
3
00
loop
10
:
sleep
2
00
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -564,28 +637,29 @@ sql select * from streamt order by c4 desc;
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop
2
goto loop
10
endi
if $data02 != 8 then
print =====data02=$data02
goto loop
2
goto loop
10
endi
$loop_count = 0
loop3:
sleep 300
loop11:
sleep 200
sql select count(*) from streamt3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
sql select count(*) from streamt3;
# row 0
if $data00 != 5 then
print =====data00=$data00
goto loop
3
goto loop
11
endi
#max,min selectivity
...
...
@@ -601,25 +675,26 @@ sql insert into ts1 values(1648791222001,2,2,3);
sleep 50
$loop_count = 0
loop3:
loop12:
sleep 200
sql select * from streamtST3;
sleep 300
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
# row 0
if $data02 != 1 then
print =====data02=$data02
goto loop
3
goto loop
12
endi
# row 1
if $data12 != 2 then
print =====data12=$data12
goto loop
3
goto loop
12
endi
...
...
@@ -629,19 +704,22 @@ sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from t1 where a > 5 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep 200
sql select * from streamt4;
# row 0
if $rows != 0 then
print =====rows=$rows
return -1
goto loop13
endi
sql insert into t1 values(1648791213000,6,2,3,1.0);
$loop_count = 0
loop4:
loop
1
4:
sleep 200
sql select * from streamt4;
...
...
@@ -652,13 +730,13 @@ endi
if $data01 != 1 then
print =====data01=$data01
goto loop4
goto loop
1
4
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop5:
loop
1
5:
sleep 200
sql select * from streamt4;
...
...
@@ -669,7 +747,7 @@ endi
if $rows != 0 then
print =====rows=$rows
goto loop5
goto loop
1
5
endi
...
...
tests/system-test/1-insert/alter_database.py
浏览文件 @
69ea49c4
...
...
@@ -10,37 +10,43 @@ from util.sql import *
from
util.cases
import
*
from
util.dnodes
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
buffer_boundary
=
[
3
,
4097
,
8193
,
12289
,
16384
]
self
.
buffer_error
=
[
self
.
buffer_boundary
[
0
]
-
1
,
self
.
buffer_boundary
[
-
1
]
+
1
,
12289
,
96
]
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
buffer_boundary
=
[
3
,
4097
,
8193
,
12289
,
16384
]
self
.
buffer_error
=
[
self
.
buffer_boundary
[
0
]
-
1
,
self
.
buffer_boundary
[
-
1
]
+
1
,
12289
,
256
]
# pages_boundary >= 64
self
.
pages_boundary
=
[
64
,
128
,
512
]
self
.
pages_boundary
=
[
64
,
128
,
512
]
self
.
pages_error
=
[
self
.
pages_boundary
[
0
]
-
1
]
def
alter_buffer
(
self
):
tdSql
.
execute
(
'create database db'
)
for
buffer
in
self
.
buffer_boundary
:
tdSql
.
execute
(
f
'alter database db buffer
{
buffer
}
'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
8
],
buffer
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
8
],
buffer
)
tdSql
.
execute
(
'drop database db'
)
tdSql
.
execute
(
'create database db vgroups 10'
)
for
buffer
in
self
.
buffer_error
:
tdSql
.
error
(
f
'alter database db buffer
{
buffer
}
'
)
tdSql
.
execute
(
'drop database db'
)
def
alter_pages
(
self
):
tdSql
.
execute
(
'create database db'
)
for
pages
in
self
.
pages_boundary
:
tdSql
.
execute
(
f
'alter database db pages
{
pages
}
'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
10
],
pages
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
10
],
pages
)
tdSql
.
execute
(
'drop database db'
)
tdSql
.
execute
(
'create database db'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
self
.
pages_error
.
append
(
tdSql
.
queryResult
[
0
][
10
])
for
pages
in
self
.
pages_error
:
tdSql
.
error
(
f
'alter database db pages
{
pages
}
'
)
...
...
@@ -55,5 +61,6 @@ class TDTestCase:
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
\ No newline at end of file
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
69ea49c4
...
...
@@ -76,8 +76,8 @@ python3 ./test.py -f 2-query/count_partition.py
python3 ./test.py
-f
2-query/count_partition.py
-R
python3 ./test.py
-f
2-query/count.py
python3 ./test.py
-f
2-query/count.py
-R
#
python3 ./test.py -f 2-query/countAlwaysReturnValue.py
#
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -R
python3 ./test.py
-f
2-query/countAlwaysReturnValue.py
python3 ./test.py
-f
2-query/countAlwaysReturnValue.py
-R
python3 ./test.py
-f
2-query/db.py
python3 ./test.py
-f
2-query/db.py
-R
python3 ./test.py
-f
2-query/diff.py
...
...
@@ -393,7 +393,7 @@ python3 ./test.py -f 2-query/max.py -Q 2
python3 ./test.py
-f
2-query/min.py
-Q
2
python3 ./test.py
-f
2-query/mode.py
-Q
2
python3 ./test.py
-f
2-query/count.py
-Q
2
#
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 2
python3 ./test.py
-f
2-query/countAlwaysReturnValue.py
-Q
2
python3 ./test.py
-f
2-query/last.py
-Q
2
python3 ./test.py
-f
2-query/first.py
-Q
2
python3 ./test.py
-f
2-query/To_iso8601.py
-Q
2
...
...
@@ -490,7 +490,7 @@ python3 ./test.py -f 2-query/max.py -Q 3
python3 ./test.py
-f
2-query/min.py
-Q
3
python3 ./test.py
-f
2-query/mode.py
-Q
3
python3 ./test.py
-f
2-query/count.py
-Q
3
#
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 3
python3 ./test.py
-f
2-query/countAlwaysReturnValue.py
-Q
3
python3 ./test.py
-f
2-query/last.py
-Q
3
python3 ./test.py
-f
2-query/first.py
-Q
3
python3 ./test.py
-f
2-query/To_iso8601.py
-Q
3
...
...
@@ -589,7 +589,7 @@ python3 ./test.py -f 2-query/max.py -Q 4
python3 ./test.py
-f
2-query/min.py
-Q
4
python3 ./test.py
-f
2-query/mode.py
-Q
4
python3 ./test.py
-f
2-query/count.py
-Q
4
#
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -Q 4
python3 ./test.py
-f
2-query/countAlwaysReturnValue.py
-Q
4
python3 ./test.py
-f
2-query/last.py
-Q
4
python3 ./test.py
-f
2-query/first.py
-Q
4
python3 ./test.py
-f
2-query/To_iso8601.py
-Q
4
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录