Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2026602a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2026602a
编写于
11月 23, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-20530
上级
2d49ad37
1a12fa92
变更
21
展开全部
隐藏空白更改
内联
并排
Showing
21 changed file
with
2145 addition
and
2222 deletion
+2145
-2222
Jenkinsfile2
Jenkinsfile2
+1
-1
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
docs/examples/python/tmq_example.py
docs/examples/python/tmq_example.py
+1
-1
packaging/release.sh
packaging/release.sh
+2
-2
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+0
-2
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-2
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+0
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-144
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+17
-3
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+2
-0
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+88
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-192
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+30
-0
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+15
-0
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+18
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1867
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+12
-0
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+1944
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/system-test/7-tmq/tmqDnodeRestart1.py
tests/system-test/7-tmq/tmqDnodeRestart1.py
+7
-3
未找到文件。
Jenkinsfile2
浏览文件 @
2026602a
...
...
@@ -303,7 +303,7 @@ def pre_test_build_win() {
set CL=/MP8
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake"
time /t
cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true || exit 7
cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true
-DBUILD_TOOLS=true
|| exit 7
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6"
time /t
jom -j 6 || exit 8
...
...
cmake/taostools_CMakeLists.txt.in
浏览文件 @
2026602a
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e
00ebd9
GIT_TAG e
fa2a5f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
docs/examples/python/tmq_example.py
浏览文件 @
2026602a
...
...
@@ -4,6 +4,7 @@ from taos.tmq import *
conn
=
taos
.
connect
()
print
(
"init"
)
conn
.
execute
(
"drop topic if exists topic_ctb_column"
)
conn
.
execute
(
"drop database if exists py_tmq"
)
conn
.
execute
(
"create database if not exists py_tmq vgroups 2"
)
conn
.
select_db
(
"py_tmq"
)
...
...
@@ -15,7 +16,6 @@ conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn
.
execute
(
"create table if not exists tb3 using stb1 tags(3)"
)
print
(
"create topic"
)
conn
.
execute
(
"drop topic if exists topic_ctb_column"
)
conn
.
execute
(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
...
...
packaging/release.sh
浏览文件 @
2026602a
...
...
@@ -221,12 +221,12 @@ if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" =
# community-version compile
cmake ../
-DCPUTYPE
=
${
cpuType
}
-DWEBSOCKET
=
true
-DOSTYPE
=
${
osType
}
-DSOMODE
=
${
soMode
}
-DDBNAME
=
${
dbName
}
-DVERTYPE
=
${
verType
}
-DVERDATE
=
"
${
build_time
}
"
-DGITINFO
=
${
gitinfo
}
-DGITINFOI
=
${
gitinfoOfInternal
}
-DVERNUMBER
=
${
verNumber
}
-DVERCOMPATIBLE
=
${
verNumberComp
}
-DPAGMODE
=
${
pagMode
}
-DBUILD_HTTP
=
${
BUILD_HTTP
}
-DBUILD_TOOLS
=
${
BUILD_TOOLS
}
${
allocator_macro
}
elif
[
"
$verMode
"
==
"cloud"
]
;
then
cmake ../../
-DCPUTYPE
=
${
cpuType
}
-DWEBSOCKET
=
true
-DBUILD_CLOUD
=
true
-DOSTYPE
=
${
osType
}
-DSOMODE
=
${
soMode
}
-DDBNAME
=
${
dbName
}
-DVERTYPE
=
${
verType
}
-DVERDATE
=
"
${
build_time
}
"
-DGITINFO
=
${
gitinfo
}
-DGITINFOI
=
${
gitinfoOfInternal
}
-DVERNUMBER
=
${
verNumber
}
-DVERCOMPATIBLE
=
${
verNumberComp
}
-DBUILD_HTTP
=
${
BUILD_HTTP
}
-DBUILD_TOOLS
=
${
BUILD_TOOLS
}
${
allocator_macro
}
cmake ../../
-DCPUTYPE
=
${
cpuType
}
-DWEBSOCKET
=
true
-DBUILD_
TAOSX
=
true
-DBUILD_
CLOUD
=
true
-DOSTYPE
=
${
osType
}
-DSOMODE
=
${
soMode
}
-DDBNAME
=
${
dbName
}
-DVERTYPE
=
${
verType
}
-DVERDATE
=
"
${
build_time
}
"
-DGITINFO
=
${
gitinfo
}
-DGITINFOI
=
${
gitinfoOfInternal
}
-DVERNUMBER
=
${
verNumber
}
-DVERCOMPATIBLE
=
${
verNumberComp
}
-DBUILD_HTTP
=
${
BUILD_HTTP
}
-DBUILD_TOOLS
=
${
BUILD_TOOLS
}
${
allocator_macro
}
elif
[
"
$verMode
"
==
"cluster"
]
;
then
if
[[
"
$dbName
"
!=
"taos"
]]
;
then
replace_enterprise_
$dbName
fi
cmake ../../
-DCPUTYPE
=
${
cpuType
}
-DWEBSOCKET
=
true
-DOSTYPE
=
${
osType
}
-DSOMODE
=
${
soMode
}
-DDBNAME
=
${
dbName
}
-DVERTYPE
=
${
verType
}
-DVERDATE
=
"
${
build_time
}
"
-DGITINFO
=
${
gitinfo
}
-DGITINFOI
=
${
gitinfoOfInternal
}
-DVERNUMBER
=
${
verNumber
}
-DVERCOMPATIBLE
=
${
verNumberComp
}
-DBUILD_HTTP
=
${
BUILD_HTTP
}
-DBUILD_TOOLS
=
${
BUILD_TOOLS
}
${
allocator_macro
}
cmake ../../
-DCPUTYPE
=
${
cpuType
}
-DWEBSOCKET
=
true
-D
BUILD_TAOSX
=
true
-D
OSTYPE
=
${
osType
}
-DSOMODE
=
${
soMode
}
-DDBNAME
=
${
dbName
}
-DVERTYPE
=
${
verType
}
-DVERDATE
=
"
${
build_time
}
"
-DGITINFO
=
${
gitinfo
}
-DGITINFOI
=
${
gitinfoOfInternal
}
-DVERNUMBER
=
${
verNumber
}
-DVERCOMPATIBLE
=
${
verNumberComp
}
-DBUILD_HTTP
=
${
BUILD_HTTP
}
-DBUILD_TOOLS
=
${
BUILD_TOOLS
}
${
allocator_macro
}
fi
else
echo
"input cpuType=
${
cpuType
}
error!!!"
...
...
source/client/src/clientMain.c
浏览文件 @
2026602a
...
...
@@ -1106,6 +1106,8 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
return
terrno
;
}
pRequest
->
syncQuery
=
true
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SCatalog
*
pCtg
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCtg
);
...
...
source/common/src/tdatablock.c
浏览文件 @
2026602a
...
...
@@ -1084,8 +1084,6 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
return
0
;
}
int32_t
varColSort
(
SColumnInfoData
*
pColumnInfoData
,
SBlockOrderInfo
*
pOrder
)
{
return
0
;
}
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
)
{
// Allocate the additional buffer.
int64_t
p0
=
taosGetTimestampUs
();
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
2026602a
...
...
@@ -308,9 +308,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
}
if
(
vnodeIsRoleLeader
(
pTq
->
pVnode
))
{
if
(
taosHashGetSize
(
pTq
->
pStreamMeta
->
pTasks
)
==
0
)
return
0
;
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamMeta
->
pTasks
)
==
0
)
return
0
;
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/libs/executor/CMakeLists.txt
浏览文件 @
2026602a
...
...
@@ -2,10 +2,6 @@ aux_source_directory(src EXECUTOR_SRC)
#add_library(executor ${EXECUTOR_SRC})
add_library
(
executor STATIC
${
EXECUTOR_SRC
}
)
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${TD_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries
(
executor
PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
2026602a
...
...
@@ -235,16 +235,6 @@ typedef enum {
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef
struct
SSourceDataInfo
{
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int64_t
startTime
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
typedef
struct
SLoadRemoteDataInfo
{
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
...
...
@@ -371,23 +361,8 @@ typedef struct STagScanInfo {
SColMatchInfo
matchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableListInfo
*
pTableList
;
}
STagScanInfo
;
typedef
struct
SLastrowScanInfo
{
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
SColMatchInfo
matchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
}
SLastrowScanInfo
;
typedef
enum
EStreamScanMode
{
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
...
...
@@ -504,40 +479,6 @@ typedef struct {
SSnapContext
*
sContext
;
}
SStreamRawScanInfo
;
typedef
struct
SSysTableIndex
{
int8_t
init
;
SArray
*
uids
;
int32_t
lastIdx
;
}
SSysTableIndex
;
typedef
struct
SSysTableScanInfo
{
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
SEpSet
epSet
;
tsem_t
ready
;
SReadHandle
readHandle
;
int32_t
accountId
;
const
char
*
pUser
;
bool
sysInfo
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
SMTbCursor
*
pCur
;
// cursor for iterate the local table meta store.
SSysTableIndex
*
pIdx
;
// idx for local table meta
SColMatchInfo
matchInfo
;
SName
name
;
SSDataBlock
*
pRes
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
}
SSysTableScanInfo
;
typedef
struct
SBlockDistInfo
{
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
}
SBlockDistInfo
;
// todo remove this
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
SSDataBlock
*
pRes
;
...
...
@@ -603,24 +544,6 @@ typedef struct SAggOperatorInfo {
SExprSupp
scalarExprSup
;
}
SAggOperatorInfo
;
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SExprSupp
scalarSup
;
uint64_t
groupId
;
SSDataBlock
*
pNextGroupRes
;
}
SIndefOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
struct
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
...
...
@@ -638,42 +561,12 @@ typedef struct SFillOperatorInfo {
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
SArray
*
pPageList
;
}
SDataGroupInfo
;
// The sort in partition may be needed later.
typedef
struct
SPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
SArray
*
sortedGroupArray
;
// SDataGroupInfo sorted by group id
int32_t
groupIndex
;
// group index
int32_t
pageIndex
;
// page index of current group
SExprSupp
scalarSup
;
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
STimeWindow
win
;
TSKEY
prevTs
;
...
...
@@ -817,33 +710,6 @@ typedef struct SStateWindowOperatorInfo {
STimeWindowAggSupp
twAggSup
;
}
SStateWindowOperatorInfo
;
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SColMatchInfo
matchInfo
;
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
}
SSortOperatorInfo
;
typedef
struct
SJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
inputOrder
;
SSDataBlock
*
pLeft
;
int32_t
leftPos
;
SColumnInfo
leftCol
;
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
pCondAfterMerge
;
}
SJoinOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
...
...
@@ -867,7 +733,6 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
...
...
@@ -897,9 +762,6 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
,
const
char
*
name
);
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
);
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
);
...
...
@@ -982,9 +844,8 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
int32_t
getMaximumIdleDurationSec
();
...
...
@@ -1012,9 +873,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
int32_t
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
char
*
result
);
int32_t
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
2026602a
...
...
@@ -25,6 +25,20 @@
#include "thash.h"
#include "ttypes.h"
typedef
struct
SCacheRowsScanInfo
{
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
SColMatchInfo
matchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
}
SCacheRowsScanInfo
;
static
SSDataBlock
*
doScanCache
(
SOperatorInfo
*
pOperator
);
static
void
destroyCacheScanOperator
(
void
*
param
);
static
int32_t
extractCacheScanSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
...
...
@@ -33,7 +47,7 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
S
LastrowScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastrow
ScanInfo
));
S
CacheRowsScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheRows
ScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -114,7 +128,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return
NULL
;
}
S
Lastrow
ScanInfo
*
pInfo
=
pOperator
->
info
;
S
CacheRows
ScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableListInfo
*
pTableList
=
pTaskInfo
->
pTableInfoList
;
...
...
@@ -240,7 +254,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
void
destroyCacheScanOperator
(
void
*
param
)
{
S
LastrowScanInfo
*
pInfo
=
(
SLastrow
ScanInfo
*
)
param
;
S
CacheRowsScanInfo
*
pInfo
=
(
SCacheRows
ScanInfo
*
)
param
;
blockDataDestroy
(
pInfo
->
pRes
);
blockDataDestroy
(
pInfo
->
pBufferredRes
);
taosMemoryFree
(
pInfo
->
pSlotIds
);
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
2026602a
...
...
@@ -250,6 +250,8 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
return
code
;
}
taosArrayClear
(
pInserter
->
pDataBlocks
);
code
=
sendSubmitRequest
(
pInserter
,
pMsg
,
pInserter
->
pParam
->
readHandle
->
pMsgCb
->
clientRpc
,
&
pInserter
->
pNode
->
epSet
);
if
(
code
)
{
return
code
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
2026602a
...
...
@@ -41,6 +41,16 @@ typedef struct SFetchRspHandleWrapper {
int32_t
sourceIndex
;
}
SFetchRspHandleWrapper
;
typedef
struct
SSourceDataInfo
{
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int64_t
startTime
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
static
void
destroyExchangeOperatorInfo
(
void
*
param
);
static
void
freeBlock
(
void
*
pParam
);
static
void
freeSourceDataInfo
(
void
*
param
);
...
...
@@ -52,6 +62,7 @@ static int32_t getCompletedSources(const SArray* pArray);
static
int32_t
prepareConcurrentlyLoad
(
SOperatorInfo
*
pOperator
);
static
int32_t
seqLoadRemoteData
(
SOperatorInfo
*
pOperator
);
static
int32_t
prepareLoadRemoteData
(
SOperatorInfo
*
pOperator
);
static
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
);
static
void
concurrentlyLoadRemoteDataImpl
(
SOperatorInfo
*
pOperator
,
SExchangeInfo
*
pExchangeInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -647,3 +658,80 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
if
(
pLimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pBlock
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
}
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
PROJECT_RETRIEVE_DONE
;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
((
!
holdDataInBuf
)
||
(
pBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
||
pLimitInfo
->
slimit
.
offset
!=
-
1
||
pLimitInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
source/libs/executor/src/executorimpl.c
浏览文件 @
2026602a
...
...
@@ -24,7 +24,6 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
...
...
@@ -297,8 +296,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64
(
pColData
,
4
,
&
pQueryWindow
->
ekey
);
}
void
cleanupExecTimeWindowInfo
(
SColumnInfoData
*
pColData
)
{
colDataDestroy
(
pColData
);
}
typedef
struct
{
bool
hasAgg
;
int32_t
numOfRows
;
...
...
@@ -1347,42 +1344,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
}
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
//
// if (pQueryAttr->limit.offset == pBlockInfo->rows) { // current block will ignore completed
// pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
// pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
// }
//
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
// } else {
// pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
// }
//
// assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
//
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
// TSKEY *keys = (TSKEY *) pColInfoData->pData;
//
// // update the offset value
// pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
// pQueryAttr->limit.offset = 0;
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
// lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
...
...
@@ -1723,159 +1684,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
int32_t
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
)
{
if
(
result
==
NULL
||
length
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)(
pOperator
->
info
);
SAggSupporter
*
pSup
=
(
SAggSupporter
*
)
POINTER_SHIFT
(
pOperator
->
info
,
sizeof
(
SOptrBasicInfo
));
int32_t
size
=
tSimpleHashGetSize
(
pSup
->
pResultRowHashTable
);
size_t
keyLen
=
sizeof
(
uint64_t
)
*
2
;
// estimate the key length
int32_t
totalSize
=
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
size
*
(
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
);
// no result
if
(
getTotalBufSize
(
pSup
->
pResultBuf
)
==
0
)
{
*
result
=
NULL
;
*
length
=
0
;
return
TSDB_CODE_SUCCESS
;
}
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
totalSize
);
if
(
*
result
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
offset
=
sizeof
(
int32_t
);
*
(
int32_t
*
)(
*
result
+
offset
)
=
size
;
offset
+=
sizeof
(
int32_t
);
// prepare memory
SResultRowPosition
*
pos
=
&
pInfo
->
resultRowInfo
.
cur
;
void
*
pPage
=
getBufPage
(
pSup
->
pResultBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
pos
->
offset
);
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pSup
->
pResultBuf
,
pPage
);
int32_t
iter
=
0
;
void
*
pIter
=
NULL
;
while
((
pIter
=
tSimpleHashIterate
(
pSup
->
pResultRowHashTable
,
pIter
,
&
iter
)))
{
void
*
key
=
tSimpleHashGetKey
(
pIter
,
&
keyLen
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
pIter
;
pPage
=
(
SFilePage
*
)
getBufPage
(
pSup
->
pResultBuf
,
p1
->
pageId
);
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
p1
->
offset
);
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pSup
->
pResultBuf
,
pPage
);
// recalculate the result size
int32_t
realTotalSize
=
offset
+
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
;
if
(
realTotalSize
>
totalSize
)
{
char
*
tmp
=
(
char
*
)
taosMemoryRealloc
(
*
result
,
realTotalSize
);
if
(
tmp
==
NULL
)
{
taosMemoryFree
(
*
result
);
*
result
=
NULL
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
*
result
=
tmp
;
}
}
// save key
*
(
int32_t
*
)(
*
result
+
offset
)
=
keyLen
;
offset
+=
sizeof
(
int32_t
);
memcpy
(
*
result
+
offset
,
key
,
keyLen
);
offset
+=
keyLen
;
// save value
*
(
int32_t
*
)(
*
result
+
offset
)
=
pSup
->
resultRowSize
;
offset
+=
sizeof
(
int32_t
);
memcpy
(
*
result
+
offset
,
pRow
,
pSup
->
resultRowSize
);
offset
+=
pSup
->
resultRowSize
;
}
*
(
int32_t
*
)(
*
result
)
=
offset
;
*
length
=
offset
;
return
TDB_CODE_SUCCESS
;
}
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
if
(
pLimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pBlock
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
}
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
PROJECT_RETRIEVE_DONE
;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
((
!
holdDataInBuf
)
||
(
pBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
||
pLimitInfo
->
slimit
.
offset
!=
-
1
||
pLimitInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
);
static
void
doHandleRemainBlockForNewGroupImpl
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
2026602a
...
...
@@ -27,6 +27,36 @@
#include "thash.h"
#include "ttypes.h"
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
// The sort in partition may be needed later.
typedef
struct
SPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
SArray
*
sortedGroupArray
;
// SDataGroupInfo sorted by group id
int32_t
groupIndex
;
// group index
int32_t
pageIndex
;
// page index of current group
SExprSupp
scalarSup
;
}
SPartitionOperatorInfo
;
static
void
*
getCurrentDataGroupInfo
(
const
SPartitionOperatorInfo
*
pInfo
,
SDataGroupInfo
**
pGroupInfo
,
int32_t
len
);
static
int32_t
*
setupColumnOffset
(
const
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
);
static
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
2026602a
...
...
@@ -24,6 +24,21 @@
#include "tmsg.h"
#include "ttypes.h"
typedef
struct
SJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
inputOrder
;
SSDataBlock
*
pLeft
;
int32_t
leftPos
;
SColumnInfo
leftCol
;
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
pCondAfterMerge
;
}
SJoinOperatorInfo
;
static
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
);
static
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
);
static
void
destroyMergeJoinOperator
(
void
*
param
);
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
2026602a
...
...
@@ -17,6 +17,24 @@
#include "executorimpl.h"
#include "functionMgt.h"
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SExprSupp
scalarSup
;
uint64_t
groupId
;
SSDataBlock
*
pNextGroupRes
;
}
SIndefOperatorInfo
;
static
SSDataBlock
*
doGenerateSourceData
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doProjectOperation
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doApplyIndefinitFunction
(
SOperatorInfo
*
pOperator
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
2026602a
此差异已折叠。
点击以展开。
source/libs/executor/src/sortoperator.c
浏览文件 @
2026602a
...
...
@@ -17,6 +17,18 @@
#include "executorimpl.h"
#include "tdatablock.h"
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SColMatchInfo
matchInfo
;
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
}
SSortOperatorInfo
;
static
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
);
static
int32_t
doOpenSortOperator
(
SOperatorInfo
*
pOperator
);
static
int32_t
getExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
...
...
source/libs/executor/src/sysscanoperator.c
0 → 100644
浏览文件 @
2026602a
此差异已折叠。
点击以展开。
tests/parallel_test/cases.task
浏览文件 @
2026602a
...
...
@@ -704,6 +704,7 @@
,,,system-test,python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py
,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
...
...
tests/system-test/7-tmq/tmqDnodeRestart1.py
浏览文件 @
2026602a
...
...
@@ -149,6 +149,8 @@ class TDTestCase:
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromStb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb
)
tmqCom
.
stopTmqSimProcess
(
processorName
=
"tmq_sim"
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
...
...
@@ -178,6 +180,8 @@ class TDTestCase:
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb"
)
topicFromDb
=
'topic_db'
...
...
@@ -203,10 +207,10 @@ class TDTestCase:
tmqCom
.
getStartCommitNotifyFromTmqsim
(
'cdb'
,
1
)
tdLog
.
info
(
"create some new child table and insert data for latest mode"
)
paraDict
[
"batchNum"
]
=
10
0
paraDict
[
"batchNum"
]
=
10
paraDict
[
"ctbPrefix"
]
=
'newCtb'
paraDict
[
"ctbNum"
]
=
10
paraDict
[
"rowsPerTbl"
]
=
10
paraDict
[
"ctbNum"
]
=
10
0
paraDict
[
"rowsPerTbl"
]
=
10
0
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"================= restart dnode ==========================="
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录