Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fda726df
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
fda726df
编写于
11月 23, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18371 from taosdata/fix/liao_cov
refactor: do some internal refactor.
上级
f06145d3
50a64b1e
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
2127 addition
and
2212 deletion
+2127
-2212
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+0
-2
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+0
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-144
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+17
-3
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+88
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-192
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+30
-0
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+15
-0
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+18
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1867
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+12
-0
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+1944
-0
未找到文件。
source/common/src/tdatablock.c
浏览文件 @
fda726df
...
...
@@ -1084,8 +1084,6 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
return
0
;
}
int32_t
varColSort
(
SColumnInfoData
*
pColumnInfoData
,
SBlockOrderInfo
*
pOrder
)
{
return
0
;
}
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
)
{
// Allocate the additional buffer.
int64_t
p0
=
taosGetTimestampUs
();
...
...
source/libs/executor/CMakeLists.txt
浏览文件 @
fda726df
...
...
@@ -2,10 +2,6 @@ aux_source_directory(src EXECUTOR_SRC)
#add_library(executor ${EXECUTOR_SRC})
add_library
(
executor STATIC
${
EXECUTOR_SRC
}
)
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${TD_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries
(
executor
PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
fda726df
...
...
@@ -235,16 +235,6 @@ typedef enum {
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef
struct
SSourceDataInfo
{
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int64_t
startTime
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
typedef
struct
SLoadRemoteDataInfo
{
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
...
...
@@ -371,23 +361,8 @@ typedef struct STagScanInfo {
SColMatchInfo
matchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableListInfo
*
pTableList
;
}
STagScanInfo
;
typedef
struct
SLastrowScanInfo
{
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
SColMatchInfo
matchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
}
SLastrowScanInfo
;
typedef
enum
EStreamScanMode
{
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
...
...
@@ -504,40 +479,6 @@ typedef struct {
SSnapContext
*
sContext
;
}
SStreamRawScanInfo
;
typedef
struct
SSysTableIndex
{
int8_t
init
;
SArray
*
uids
;
int32_t
lastIdx
;
}
SSysTableIndex
;
typedef
struct
SSysTableScanInfo
{
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
SEpSet
epSet
;
tsem_t
ready
;
SReadHandle
readHandle
;
int32_t
accountId
;
const
char
*
pUser
;
bool
sysInfo
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
SMTbCursor
*
pCur
;
// cursor for iterate the local table meta store.
SSysTableIndex
*
pIdx
;
// idx for local table meta
SColMatchInfo
matchInfo
;
SName
name
;
SSDataBlock
*
pRes
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
}
SSysTableScanInfo
;
typedef
struct
SBlockDistInfo
{
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
}
SBlockDistInfo
;
// todo remove this
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
SSDataBlock
*
pRes
;
...
...
@@ -603,24 +544,6 @@ typedef struct SAggOperatorInfo {
SExprSupp
scalarExprSup
;
}
SAggOperatorInfo
;
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SExprSupp
scalarSup
;
uint64_t
groupId
;
SSDataBlock
*
pNextGroupRes
;
}
SIndefOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
struct
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
...
...
@@ -638,42 +561,12 @@ typedef struct SFillOperatorInfo {
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
SArray
*
pPageList
;
}
SDataGroupInfo
;
// The sort in partition may be needed later.
typedef
struct
SPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
SArray
*
sortedGroupArray
;
// SDataGroupInfo sorted by group id
int32_t
groupIndex
;
// group index
int32_t
pageIndex
;
// page index of current group
SExprSupp
scalarSup
;
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
STimeWindow
win
;
TSKEY
prevTs
;
...
...
@@ -817,33 +710,6 @@ typedef struct SStateWindowOperatorInfo {
STimeWindowAggSupp
twAggSup
;
}
SStateWindowOperatorInfo
;
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SColMatchInfo
matchInfo
;
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
}
SSortOperatorInfo
;
typedef
struct
SJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
inputOrder
;
SSDataBlock
*
pLeft
;
int32_t
leftPos
;
SColumnInfo
leftCol
;
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
pCondAfterMerge
;
}
SJoinOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
...
...
@@ -867,7 +733,6 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
...
...
@@ -897,9 +762,6 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
,
const
char
*
name
);
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
);
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
);
...
...
@@ -982,9 +844,8 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
int32_t
getMaximumIdleDurationSec
();
...
...
@@ -1012,9 +873,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
int32_t
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
char
*
result
);
int32_t
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
fda726df
...
...
@@ -25,6 +25,20 @@
#include "thash.h"
#include "ttypes.h"
typedef
struct
SCacheRowsScanInfo
{
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
SColMatchInfo
matchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
}
SCacheRowsScanInfo
;
static
SSDataBlock
*
doScanCache
(
SOperatorInfo
*
pOperator
);
static
void
destroyCacheScanOperator
(
void
*
param
);
static
int32_t
extractCacheScanSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
...
...
@@ -33,7 +47,7 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
S
LastrowScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastrow
ScanInfo
));
S
CacheRowsScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheRows
ScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -114,7 +128,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return
NULL
;
}
S
Lastrow
ScanInfo
*
pInfo
=
pOperator
->
info
;
S
CacheRows
ScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableListInfo
*
pTableList
=
pTaskInfo
->
pTableInfoList
;
...
...
@@ -240,7 +254,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
void
destroyCacheScanOperator
(
void
*
param
)
{
S
LastrowScanInfo
*
pInfo
=
(
SLastrow
ScanInfo
*
)
param
;
S
CacheRowsScanInfo
*
pInfo
=
(
SCacheRows
ScanInfo
*
)
param
;
blockDataDestroy
(
pInfo
->
pRes
);
blockDataDestroy
(
pInfo
->
pBufferredRes
);
taosMemoryFree
(
pInfo
->
pSlotIds
);
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
fda726df
...
...
@@ -41,6 +41,16 @@ typedef struct SFetchRspHandleWrapper {
int32_t
sourceIndex
;
}
SFetchRspHandleWrapper
;
typedef
struct
SSourceDataInfo
{
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int64_t
startTime
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
static
void
destroyExchangeOperatorInfo
(
void
*
param
);
static
void
freeBlock
(
void
*
pParam
);
static
void
freeSourceDataInfo
(
void
*
param
);
...
...
@@ -52,6 +62,7 @@ static int32_t getCompletedSources(const SArray* pArray);
static
int32_t
prepareConcurrentlyLoad
(
SOperatorInfo
*
pOperator
);
static
int32_t
seqLoadRemoteData
(
SOperatorInfo
*
pOperator
);
static
int32_t
prepareLoadRemoteData
(
SOperatorInfo
*
pOperator
);
static
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
);
static
void
concurrentlyLoadRemoteDataImpl
(
SOperatorInfo
*
pOperator
,
SExchangeInfo
*
pExchangeInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -647,3 +658,80 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
if
(
pLimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pBlock
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
}
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
PROJECT_RETRIEVE_DONE
;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
((
!
holdDataInBuf
)
||
(
pBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
||
pLimitInfo
->
slimit
.
offset
!=
-
1
||
pLimitInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
source/libs/executor/src/executorimpl.c
浏览文件 @
fda726df
...
...
@@ -24,7 +24,6 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
...
...
@@ -297,8 +296,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64
(
pColData
,
4
,
&
pQueryWindow
->
ekey
);
}
void
cleanupExecTimeWindowInfo
(
SColumnInfoData
*
pColData
)
{
colDataDestroy
(
pColData
);
}
typedef
struct
{
bool
hasAgg
;
int32_t
numOfRows
;
...
...
@@ -1347,42 +1344,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
}
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
//
// if (pQueryAttr->limit.offset == pBlockInfo->rows) { // current block will ignore completed
// pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
// pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
// }
//
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
// } else {
// pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
// }
//
// assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
//
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
// TSKEY *keys = (TSKEY *) pColInfoData->pData;
//
// // update the offset value
// pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
// pQueryAttr->limit.offset = 0;
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
// lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
...
...
@@ -1723,159 +1684,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
int32_t
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
)
{
if
(
result
==
NULL
||
length
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)(
pOperator
->
info
);
SAggSupporter
*
pSup
=
(
SAggSupporter
*
)
POINTER_SHIFT
(
pOperator
->
info
,
sizeof
(
SOptrBasicInfo
));
int32_t
size
=
tSimpleHashGetSize
(
pSup
->
pResultRowHashTable
);
size_t
keyLen
=
sizeof
(
uint64_t
)
*
2
;
// estimate the key length
int32_t
totalSize
=
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
size
*
(
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
);
// no result
if
(
getTotalBufSize
(
pSup
->
pResultBuf
)
==
0
)
{
*
result
=
NULL
;
*
length
=
0
;
return
TSDB_CODE_SUCCESS
;
}
*
result
=
(
char
*
)
taosMemoryCalloc
(
1
,
totalSize
);
if
(
*
result
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
offset
=
sizeof
(
int32_t
);
*
(
int32_t
*
)(
*
result
+
offset
)
=
size
;
offset
+=
sizeof
(
int32_t
);
// prepare memory
SResultRowPosition
*
pos
=
&
pInfo
->
resultRowInfo
.
cur
;
void
*
pPage
=
getBufPage
(
pSup
->
pResultBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
pos
->
offset
);
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pSup
->
pResultBuf
,
pPage
);
int32_t
iter
=
0
;
void
*
pIter
=
NULL
;
while
((
pIter
=
tSimpleHashIterate
(
pSup
->
pResultRowHashTable
,
pIter
,
&
iter
)))
{
void
*
key
=
tSimpleHashGetKey
(
pIter
,
&
keyLen
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
pIter
;
pPage
=
(
SFilePage
*
)
getBufPage
(
pSup
->
pResultBuf
,
p1
->
pageId
);
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
p1
->
offset
);
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pSup
->
pResultBuf
,
pPage
);
// recalculate the result size
int32_t
realTotalSize
=
offset
+
sizeof
(
int32_t
)
+
keyLen
+
sizeof
(
int32_t
)
+
pSup
->
resultRowSize
;
if
(
realTotalSize
>
totalSize
)
{
char
*
tmp
=
(
char
*
)
taosMemoryRealloc
(
*
result
,
realTotalSize
);
if
(
tmp
==
NULL
)
{
taosMemoryFree
(
*
result
);
*
result
=
NULL
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
*
result
=
tmp
;
}
}
// save key
*
(
int32_t
*
)(
*
result
+
offset
)
=
keyLen
;
offset
+=
sizeof
(
int32_t
);
memcpy
(
*
result
+
offset
,
key
,
keyLen
);
offset
+=
keyLen
;
// save value
*
(
int32_t
*
)(
*
result
+
offset
)
=
pSup
->
resultRowSize
;
offset
+=
sizeof
(
int32_t
);
memcpy
(
*
result
+
offset
,
pRow
,
pSup
->
resultRowSize
);
offset
+=
pSup
->
resultRowSize
;
}
*
(
int32_t
*
)(
*
result
)
=
offset
;
*
length
=
offset
;
return
TDB_CODE_SUCCESS
;
}
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
pLimitInfo
->
remainGroupOffset
-=
1
;
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
}
// here check for a new group data, we need to handle the data of the previous group.
if
(
pLimitInfo
->
currentGroupId
!=
0
&&
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
pLimitInfo
->
numOfOutputGroups
+=
1
;
if
((
pLimitInfo
->
slimit
.
limit
>
0
)
&&
(
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
if
(
pLimitInfo
->
remainOffset
>=
pBlock
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataCleanup
(
pBlock
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pBlock
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
}
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
PROJECT_RETRIEVE_DONE
;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
((
!
holdDataInBuf
)
||
(
pBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
||
pLimitInfo
->
slimit
.
offset
!=
-
1
||
pLimitInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
);
static
void
doHandleRemainBlockForNewGroupImpl
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
fda726df
...
...
@@ -27,6 +27,36 @@
#include "thash.h"
#include "ttypes.h"
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
// The sort in partition may be needed later.
typedef
struct
SPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
SArray
*
sortedGroupArray
;
// SDataGroupInfo sorted by group id
int32_t
groupIndex
;
// group index
int32_t
pageIndex
;
// page index of current group
SExprSupp
scalarSup
;
}
SPartitionOperatorInfo
;
static
void
*
getCurrentDataGroupInfo
(
const
SPartitionOperatorInfo
*
pInfo
,
SDataGroupInfo
**
pGroupInfo
,
int32_t
len
);
static
int32_t
*
setupColumnOffset
(
const
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
);
static
int32_t
setGroupResultOutputBuf
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
fda726df
...
...
@@ -24,6 +24,21 @@
#include "tmsg.h"
#include "ttypes.h"
typedef
struct
SJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
inputOrder
;
SSDataBlock
*
pLeft
;
int32_t
leftPos
;
SColumnInfo
leftCol
;
SSDataBlock
*
pRight
;
int32_t
rightPos
;
SColumnInfo
rightCol
;
SNode
*
pCondAfterMerge
;
}
SJoinOperatorInfo
;
static
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
);
static
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
);
static
void
destroyMergeJoinOperator
(
void
*
param
);
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
fda726df
...
...
@@ -17,6 +17,24 @@
#include "executorimpl.h"
#include "functionMgt.h"
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SExprSupp
scalarSup
;
uint64_t
groupId
;
SSDataBlock
*
pNextGroupRes
;
}
SIndefOperatorInfo
;
static
SSDataBlock
*
doGenerateSourceData
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doProjectOperation
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doApplyIndefinitFunction
(
SOperatorInfo
*
pOperator
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
fda726df
此差异已折叠。
点击以展开。
source/libs/executor/src/sortoperator.c
浏览文件 @
fda726df
...
...
@@ -17,6 +17,18 @@
#include "executorimpl.h"
#include "tdatablock.h"
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SColMatchInfo
matchInfo
;
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
}
SSortOperatorInfo
;
static
SSDataBlock
*
doSort
(
SOperatorInfo
*
pOperator
);
static
int32_t
doOpenSortOperator
(
SOperatorInfo
*
pOperator
);
static
int32_t
getExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
...
...
source/libs/executor/src/sysscanoperator.c
0 → 100644
浏览文件 @
fda726df
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录