Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
aee4c7a2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
aee4c7a2
编写于
4月 06, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/shm
上级
563d2ec6
af32cfca
变更
69
展开全部
隐藏空白更改
内联
并排
Showing
69 changed file
with
4192 addition
and
2454 deletion
+4192
-2454
include/common/tmsg.h
include/common/tmsg.h
+16
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/common/ttypes.h
include/common/ttypes.h
+1
-0
include/common/tvariant.h
include/common/tvariant.h
+1
-0
include/libs/command/command.h
include/libs/command/command.h
+6
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+2
-0
include/libs/function/function.h
include/libs/function/function.h
+6
-3
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+4
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+27
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+4
-1
include/libs/scalar/filter.h
include/libs/scalar/filter.h
+1
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+3
-0
include/util/tlockfree.h
include/util/tlockfree.h
+1
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+6
-1
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+2
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+42
-0
source/common/src/tvariant.c
source/common/src/tvariant.c
+25
-1
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+2
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-2
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+49
-23
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+426
-104
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+1
-6
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+58
-28
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+9
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+243
-1502
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+507
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+834
-0
source/libs/executor/src/tsimplehash.c
source/libs/executor/src/tsimplehash.c
+3
-3
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+13
-10
source/libs/function/inc/taggfunction.h
source/libs/function/inc/taggfunction.h
+0
-2
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+113
-99
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+297
-45
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+5
-0
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+7
-7
source/libs/nodes/inc/nodesUtil.h
source/libs/nodes/inc/nodesUtil.h
+1
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+8
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+67
-0
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+3
-4
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+100
-1
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+2
-2
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+6
-42
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+334
-330
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+66
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+113
-1
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+39
-0
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+4
-2
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+2
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+58
-45
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+22
-1
source/libs/scalar/inc/sclvector.h
source/libs/scalar/inc/sclvector.h
+2
-0
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+2
-1
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+2
-1
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+71
-36
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+29
-1
source/libs/scalar/test/filter/filterTests.cpp
source/libs/scalar/test/filter/filterTests.cpp
+12
-10
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+5
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+224
-110
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+3
-0
source/util/src/thash.c
source/util/src/thash.c
+10
-6
source/util/src/tlockfree.c
source/util/src/tlockfree.c
+15
-0
tests/script/tsim/query/diff.sim
tests/script/tsim/query/diff.sim
+129
-0
tests/script/tsim/query/session.sim
tests/script/tsim/query/session.sim
+15
-12
tests/script/tsim/query/stddev.sim
tests/script/tsim/query/stddev.sim
+124
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
aee4c7a2
...
...
@@ -930,6 +930,21 @@ typedef struct {
char
data
[];
}
SRetrieveMetaTableRsp
;
typedef
struct
SExplainExecInfo
{
uint64_t
startupCost
;
uint64_t
totalCost
;
uint64_t
numOfRows
;
void
*
verboseInfo
;
}
SExplainExecInfo
;
typedef
struct
{
int32_t
numOfPlans
;
SExplainExecInfo
*
subplanInfo
;
}
SExplainRsp
;
int32_t
tSerializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
);
int32_t
tDeserializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
);
typedef
struct
{
char
fqdn
[
TSDB_FQDN_LEN
];
// end point, hostname:port
int32_t
port
;
...
...
@@ -1067,6 +1082,7 @@ typedef struct SSubQueryMsg {
uint64_t
taskId
;
int64_t
refId
;
int8_t
taskType
;
int8_t
explain
;
uint32_t
sqlLen
;
// the query sql,
uint32_t
phyLen
;
char
msg
[];
...
...
include/common/tmsgdef.h
浏览文件 @
aee4c7a2
...
...
@@ -188,6 +188,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SHOW_TABLES_FETCH
,
"vnode-show-tables-fetch"
,
SVShowTablesFetchReq
,
SVShowTablesFetchRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_QUERY_CONTINUE
,
"vnode-query-continue"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_QUERY_HEARTBEAT
,
"vnode-query-heartbeat"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_EXPLAIN
,
"vnode-explain"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
...
...
include/common/ttypes.h
浏览文件 @
aee4c7a2
...
...
@@ -146,6 +146,7 @@ typedef struct {
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
#define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_VALID_TINYINT(_t) ((_t) > INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) > INT16_MIN && (_t) <= INT16_MAX)
...
...
include/common/tvariant.h
浏览文件 @
aee4c7a2
...
...
@@ -59,6 +59,7 @@ int32_t taosVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool
#endif
int32_t
taosVariantTypeSetType
(
SVariant
*
pVariant
,
char
type
);
char
*
taosVariantGet
(
SVariant
*
pVar
,
int32_t
type
);
#ifdef __cplusplus
}
...
...
include/libs/command/command.h
浏览文件 @
aee4c7a2
...
...
@@ -17,8 +17,14 @@
#include "tmsg.h"
#include "plannodes.h"
typedef
struct
SExplainCtx
SExplainCtx
;
int32_t
qExecCommand
(
SNode
*
pStmt
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExecStaticExplain
(
SQueryPlan
*
pDag
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExecExplainBegin
(
SQueryPlan
*
pDag
,
SExplainCtx
**
pCtx
,
int32_t
startTs
);
int32_t
qExecExplainEnd
(
SExplainCtx
*
pCtx
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExplainUpdateExecInfo
(
SExplainCtx
*
pCtx
,
SExplainRsp
*
pRspMsg
,
int32_t
groupId
,
SRetrieveTableRsp
**
pRsp
);
void
qExplainFreeCtx
(
SExplainCtx
*
pCtx
);
include/libs/executor/executor.h
浏览文件 @
aee4c7a2
...
...
@@ -174,6 +174,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void
qProcessFetchRsp
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
);
#ifdef __cplusplus
}
#endif
...
...
include/libs/function/function.h
浏览文件 @
aee4c7a2
...
...
@@ -36,7 +36,7 @@ typedef struct SFuncExecEnv {
typedef
bool
(
*
FExecGetEnv
)(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
typedef
bool
(
*
FExecInit
)(
struct
SqlFunctionCtx
*
pCtx
,
struct
SResultRowEntryInfo
*
pResultCellInfo
);
typedef
void
(
*
FExecProcess
)(
struct
SqlFunctionCtx
*
pCtx
);
typedef
int32_t
(
*
FExecProcess
)(
struct
SqlFunctionCtx
*
pCtx
);
typedef
void
(
*
FExecFinalize
)(
struct
SqlFunctionCtx
*
pCtx
);
typedef
int32_t
(
*
FScalarExecProcess
)(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
);
...
...
@@ -154,7 +154,9 @@ typedef struct SResultDataInfo {
int32_t
interBufSize
;
}
SResultDataInfo
;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
typedef
struct
SInputColumnInfoData
{
int32_t
totalRows
;
// total rows in current columnar data
...
...
@@ -192,7 +194,8 @@ typedef struct SqlFunctionCtx {
int32_t
numOfParams
;
SVariant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t
*
ptsList
;
// corresponding timestamp array list
void
*
ptsOutputBuf
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SColumnInfoData
*
pTsOutput
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t
offset
;
SVariant
tag
;
struct
SResultRowEntryInfo
*
resultInfo
;
SSubsidiaryResInfo
subsidiaryRes
;
...
...
include/libs/nodes/nodes.h
浏览文件 @
aee4c7a2
...
...
@@ -163,6 +163,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
QUERY_NODE_PHYSICAL_SUBPLAN
,
...
...
@@ -237,6 +239,8 @@ int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int3
int32_t
nodesStringToList
(
const
char
*
pStr
,
SNodeList
**
pList
);
int32_t
nodesNodeToSQL
(
SNode
*
pNode
,
char
*
buf
,
int32_t
bufSize
,
int32_t
*
len
);
char
*
nodesGetNameFromColumnNode
(
SNode
*
pNode
);
int32_t
nodesGetOutputNumFromSlotList
(
SNodeList
*
pSlots
);
#ifdef __cplusplus
}
...
...
include/libs/nodes/plannodes.h
浏览文件 @
aee4c7a2
...
...
@@ -104,6 +104,7 @@ typedef struct SWindowLogicNode {
SFillNode
*
pFill
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
}
SWindowLogicNode
;
typedef
struct
SSortLogicNode
{
...
...
@@ -194,11 +195,20 @@ typedef struct SSystemTableScanPhysiNode {
int32_t
accountId
;
}
SSystemTableScanPhysiNode
;
typedef
enum
EScanRequired
{
SCAN_REQUIRED_DATA_NO_NEEDED
=
1
,
SCAN_REQUIRED_DATA_STATIS_NEEDED
,
SCAN_REQUIRED_DATA_ALL_NEEDED
,
SCAN_REQUIRED_DATA_DISCARD
,
}
EScanRequired
;
typedef
struct
STableScanPhysiNode
{
SScanPhysiNode
scan
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
STimeWindow
scanRange
;
double
ratio
;
EScanRequired
scanRequired
;
SNodeList
*
pScanReferFuncs
;
}
STableScanPhysiNode
;
typedef
STableScanPhysiNode
STableSeqScanPhysiNode
;
...
...
@@ -257,17 +267,33 @@ typedef struct SIntervalPhysiNode {
SFillNode
*
pFill
;
}
SIntervalPhysiNode
;
typedef
struct
SMultiTableIntervalPhysiNode
{
SIntervalPhysiNode
interval
;
SNodeList
*
pPartitionKeys
;
}
SMultiTableIntervalPhysiNode
;
typedef
struct
SSessionWinodwPhysiNode
{
SWinodwPhysiNode
window
;
int64_t
gap
;
}
SSessionWinodwPhysiNode
;
typedef
struct
SStateWinodwPhysiNode
{
SWinodwPhysiNode
window
;
SNode
*
pStateKey
;
}
SStateWinodwPhysiNode
;
typedef
struct
SSortPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList
*
pSortKeys
;
// element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
}
SSortPhysiNode
;
typedef
struct
SPartitionPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of partition_by_clause
SNodeList
*
pPartitionKeys
;
}
SPartitionPhysiNode
;
typedef
struct
SDataSinkNode
{
ENodeType
type
;
SDataBlockDescNode
*
pInputDataBlockDesc
;
...
...
@@ -308,6 +334,7 @@ typedef enum EExplainMode {
typedef
struct
SExplainInfo
{
EExplainMode
mode
;
bool
verbose
;
double
ratio
;
}
SExplainInfo
;
typedef
struct
SQueryPlan
{
...
...
include/libs/nodes/querynodes.h
浏览文件 @
aee4c7a2
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "nodes.h"
#include "tmsg.h"
#include "tvariant.h"
#define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags)
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema)))
...
...
@@ -188,7 +189,7 @@ typedef struct SLimitNode {
typedef
struct
SStateWindowNode
{
ENodeType
type
;
// QUERY_NODE_STATE_WINDOW
SNode
*
p
Col
;
SNode
*
p
Expr
;
}
SStateWindowNode
;
typedef
struct
SSessionWindowNode
{
...
...
@@ -316,6 +317,8 @@ bool nodesIsTimelineQuery(const SNode* pQuery);
void
*
nodesGetValueFromNode
(
SValueNode
*
pNode
);
char
*
nodesGetStrValueFromNode
(
SValueNode
*
pNode
);
char
*
getFillModeString
(
EFillMode
mode
);
void
valueNodeToVariant
(
const
SValueNode
*
pNode
,
SVariant
*
pVal
);
#ifdef __cplusplus
}
...
...
include/libs/scalar/filter.h
浏览文件 @
aee4c7a2
...
...
@@ -40,7 +40,7 @@ extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t op
extern
bool
filterExecute
(
SFilterInfo
*
info
,
SSDataBlock
*
pSrc
,
int8_t
**
p
,
SColumnDataAgg
*
statis
,
int16_t
numOfCols
);
extern
int32_t
filterSetDataFromSlotId
(
SFilterInfo
*
info
,
void
*
param
);
extern
int32_t
filterSetDataFromColId
(
SFilterInfo
*
info
,
void
*
param
);
extern
int32_t
filterGetTimeRange
(
S
FilterInfo
*
info
,
STimeWindow
*
win
);
extern
int32_t
filterGetTimeRange
(
S
Node
*
pNode
,
STimeWindow
*
win
,
bool
*
isStrict
);
extern
int32_t
filterConverNcharColumns
(
SFilterInfo
*
pFilterInfo
,
int32_t
rows
,
bool
*
gotNchar
);
extern
int32_t
filterFreeNcharColumns
(
SFilterInfo
*
pFilterInfo
);
extern
void
filterFreeInfo
(
SFilterInfo
*
info
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
aee4c7a2
...
...
@@ -71,7 +71,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
);
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SQueryResult
*
pRes
);
/**
* Process the query job, generated according to the query physical plan.
...
...
include/util/taoserror.h
浏览文件 @
aee4c7a2
...
...
@@ -483,6 +483,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME TAOS_DEF_ERROR_CODE(0, 0x2617)
#define TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR TAOS_DEF_ERROR_CODE(0, 0x2618)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
#ifdef __cplusplus
}
#endif
...
...
include/util/tlockfree.h
浏览文件 @
aee4c7a2
...
...
@@ -76,6 +76,7 @@ void taosWLockLatch(SRWLatch *pLatch);
void
taosWUnLockLatch
(
SRWLatch
*
pLatch
);
void
taosRLockLatch
(
SRWLatch
*
pLatch
);
void
taosRUnLockLatch
(
SRWLatch
*
pLatch
);
int32_t
taosWTryLockLatch
(
SRWLatch
*
pLatch
);
// copy on read
#define taosCorBeginRead(x) \
...
...
source/client/inc/clientInt.h
浏览文件 @
aee4c7a2
...
...
@@ -163,6 +163,7 @@ typedef struct SReqResultInfo {
uint64_t
totalRows
;
uint32_t
current
;
bool
completed
;
int32_t
precision
;
int32_t
payloadLen
;
}
SReqResultInfo
;
...
...
source/client/src/clientImpl.c
浏览文件 @
aee4c7a2
...
...
@@ -254,7 +254,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
&
res
);
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -825,6 +825,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo
->
current
=
0
;
pResultInfo
->
completed
=
(
pRsp
->
completed
==
1
);
pResultInfo
->
payloadLen
=
htonl
(
pRsp
->
compLen
);
pResultInfo
->
precision
=
pRsp
->
precision
;
// TODO handle the compressed case
pResultInfo
->
totalRows
+=
pResultInfo
->
numOfRows
;
...
...
source/client/src/clientMain.c
浏览文件 @
aee4c7a2
...
...
@@ -323,7 +323,12 @@ int taos_affected_rows(TAOS_RES *res) {
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
return
TSDB_TIME_PRECISION_MILLI
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
return
TSDB_TIME_PRECISION_MILLI
;
}
return
pRequest
->
body
.
resInfo
.
precision
;
}
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
)
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
aee4c7a2
...
...
@@ -400,6 +400,7 @@ TEST(testCase, show_vgroup_Test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST
(
testCase
,
create_multiple_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -458,7 +459,7 @@ TEST(testCase, create_multiple_tables) {
taos_free_result
(
pRes
);
for (int32_t i = 0; i < 20; ++i) {
for
(
int32_t
i
=
0
;
i
<
2
500
0
;
++
i
)
{
char
sql
[
512
]
=
{
0
};
snprintf
(
sql
,
tListLen
(
sql
),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)"
,
i
,
...
...
@@ -652,7 +653,6 @@ TEST(testCase, projection_query_stables) {
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#endif
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
source/common/src/tmsg.c
浏览文件 @
aee4c7a2
...
...
@@ -2769,6 +2769,48 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
return
0
;
}
int32_t
tSerializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfPlans
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfPlans
;
++
i
)
{
SExplainExecInfo
*
info
=
&
pRsp
->
subplanInfo
[
i
];
if
(
tEncodeU64
(
&
encoder
,
info
->
startupCost
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
info
->
totalCost
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
info
->
numOfRows
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
numOfPlans
)
<
0
)
return
-
1
;
if
(
pRsp
->
numOfPlans
>
0
)
{
pRsp
->
subplanInfo
=
taosMemoryMalloc
(
pRsp
->
numOfPlans
*
sizeof
(
SExplainExecInfo
));
if
(
pRsp
->
subplanInfo
==
NULL
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfPlans
;
++
i
)
{
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
startupCost
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
totalCost
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
numOfRows
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
if
(
buf
!=
NULL
)
{
...
...
source/common/src/tvariant.c
浏览文件 @
aee4c7a2
...
...
@@ -1014,4 +1014,28 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
}
return
0
;
}
\ No newline at end of file
}
char
*
taosVariantGet
(
SVariant
*
pVar
,
int32_t
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
return
(
char
*
)
&
pVar
->
i
;
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_FLOAT
:
return
(
char
*
)
&
pVar
->
d
;
case
TSDB_DATA_TYPE_BINARY
:
return
(
char
*
)
pVar
->
pz
;
case
TSDB_DATA_TYPE_NCHAR
:
return
(
char
*
)
pVar
->
ucs4
;
default:
return
NULL
;
}
return
NULL
;
}
source/dnode/vnode/inc/tsdb.h
浏览文件 @
aee4c7a2
...
...
@@ -172,9 +172,9 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
queryHandle
,
STableBlockDistInfo
*
pTableBlockInfo
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
p
TsdbReadHandle
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
p
Reader
);
/**
*
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
aee4c7a2
...
...
@@ -3046,8 +3046,8 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
// return code;
//}
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
p
TsdbReadHandle
)
{
return
((
STsdbReadHandle
*
)
p
TsdbReadHandle
)
->
cachelastrow
>
TSDB_CACHED_TYPE_NONE
;
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
p
Reader
)
{
return
((
STsdbReadHandle
*
)
p
Reader
)
->
cachelastrow
>
TSDB_CACHED_TYPE_NONE
;
}
int32_t
checkForCachedLastRow
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableGroupInfo
*
groupList
)
{
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
aee4c7a2
...
...
@@ -26,38 +26,54 @@ extern "C" {
#define EXPLAIN_MAX_GROUP_NUM 100
//newline area
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s
columns=%d width=%d
"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s
columns=%d width=%d
"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s
columns=%d width=%d
"
#define EXPLAIN_PROJECTION_FORMAT "Projection
columns=%d width=%d
"
#define EXPLAIN_JOIN_FORMAT "%s
between %d tables width=%d
"
#define EXPLAIN_AGG_FORMAT "Aggragate
functions=%d
"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1
width=%d
"
#define EXPLAIN_SORT_FORMAT "Sort
on %d Column(s) width=%d
"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s
functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d
"
#define EXPLAIN_SESSION_FORMAT "Session
gap=%" PRId64 " functions=%d width=%d
"
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s"
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_OUTPUT_FORMAT "Output: "
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
//append area
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
#define EXPLAIN_WIDTH_FORMAT " width=%d"
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
#define EXPLAIN_LEFT_PARENTHESIS_FORMAT " ("
#define EXPLAIN_RIGHT_PARENTHESIS_FORMAT ")"
#define EXPLAIN_BLANK_FORMAT " "
#define EXPLAIN_COST_FORMAT "cost=%.2f..%.2f"
#define EXPLAIN_ROWS_FORMAT "rows=%" PRIu64
#define EXPLAIN_COLUMNS_FORMAT "columns=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_LOOPS_FORMAT "loops=%d"
#define EXPLAIN_REVERSE_FORMAT "reverse=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%" PRIu64 "..%" PRIu64 " rows=%" PRIu64
typedef
struct
SExplainGroup
{
int32_t
nodeNum
;
int32_t
physiPlanExecNum
;
int32_t
physiPlanNum
;
int32_t
physiPlanExecIdx
;
SRWLatch
lock
;
SSubplan
*
plan
;
void
*
execInfo
;
//TODO
SArray
*
nodeExecInfo
;
//Array<SExplainRsp>
}
SExplainGroup
;
typedef
struct
SExplainResNode
{
SNodeList
*
pChildren
;
SPhysiNode
*
pNode
;
void
*
pExecInfo
;
SNodeList
*
pChildren
;
SPhysiNode
*
pNode
;
SArray
*
pExecInfo
;
// Array<SExplainExecInfo>
}
SExplainResNode
;
typedef
struct
SQueryExplainRowInfo
{
...
...
@@ -67,11 +83,21 @@ typedef struct SQueryExplainRowInfo {
}
SQueryExplainRowInfo
;
typedef
struct
SExplainCtx
{
int32_t
totalSize
;
bool
verbose
;
char
*
tbuf
;
SArray
*
rows
;
SHashObj
*
groupHash
;
EExplainMode
mode
;
double
ratio
;
bool
verbose
;
SRWLatch
lock
;
int32_t
rootGroupId
;
int32_t
dataSize
;
bool
execDone
;
int64_t
reqStartTs
;
int64_t
jobStartTs
;
int64_t
jobDoneTs
;
char
*
tbuf
;
SArray
*
rows
;
int32_t
groupDoneNum
;
SHashObj
*
groupHash
;
// Hash<SExplainGroup>
}
SExplainCtx
;
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
...
...
source/libs/command/src/explain.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/executor/CMakeLists.txt
浏览文件 @
aee4c7a2
aux_source_directory
(
src EXECUTOR_SRC
)
#add_library(executor ${EXECUTOR_SRC})
#target_link_libraries(
# executor
# PRIVATE os util common function parser planner qcom tsdb
#)
add_library
(
executor STATIC
${
EXECUTOR_SRC
}
)
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries
(
executor
PRIVATE os util common function parser planner qcom vnode scalar nodes
)
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
aee4c7a2
...
...
@@ -36,6 +36,7 @@ extern "C" {
#include "thash.h"
#include "tlockfree.h"
#include "tpagedbuf.h"
#include "tmsg.h"
struct
SColumnFilterElem
;
...
...
@@ -165,7 +166,7 @@ typedef struct STaskCostInfo {
typedef
struct
SOperatorCostInfo
{
uint64_t
openCost
;
uint64_t
exec
Cost
;
uint64_t
total
Cost
;
}
SOperatorCostInfo
;
typedef
struct
SOrder
{
...
...
@@ -238,6 +239,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
bool
*
newgroup
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
int32_t
(
*
__optr_get_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
);
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
...
...
@@ -306,26 +308,27 @@ enum {
};
typedef
struct
SOperatorInfo
{
uint8_t
operatorType
;
bool
blockingOptr
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
int32_t
numOfOutput
;
// number of columns of the current operator results
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
SExprInfo
*
pExpr
;
STaskRuntimeEnv
*
pRuntimeEnv
;
// todo remove it
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model.
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
uint8_t
operatorType
;
bool
blockingOptr
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
int32_t
numOfOutput
;
// number of columns of the current operator results
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
SExprInfo
*
pExpr
;
STaskRuntimeEnv
*
pRuntimeEnv
;
// todo remove it
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model.
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_get_explain_fn_t
getExplainFn
;
}
SOperatorInfo
;
typedef
struct
{
...
...
@@ -385,6 +388,12 @@ typedef struct SExchangeInfo {
SLoadRemoteDataInfo
loadInfo
;
}
SExchangeInfo
;
typedef
struct
SColMatchInfo
{
int32_t
colId
;
int32_t
targetSlotId
;
bool
output
;
}
SColMatchInfo
;
typedef
struct
STableScanInfo
{
void
*
dataReader
;
int32_t
numOfBlocks
;
// extract basic running information.
...
...
@@ -497,8 +506,9 @@ typedef struct SAggOperatorInfo {
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SSDataBlock
*
existDataBlock
;
int32_t
threshold
;
SArray
*
pPseudoColInfo
;
SLimit
limit
;
int64_t
curOffset
;
int64_t
curOutput
;
...
...
@@ -544,6 +554,7 @@ typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
SNode
*
pCondition
;
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
...
...
@@ -623,13 +634,31 @@ typedef struct SDistinctOperatorInfo {
SHashObj
*
pSet
;
SSDataBlock
*
pRes
;
bool
recordNullVal
;
// has already record the null value, no need to try again
int64_t
threshold
;
int64_t
outputCapacity
;
int32_t
totalBytes
;
// int64_t threshold; // todo remove it
// int64_t outputCapacity;// todo remove it
// int32_t totalBytes; // todo remove it
SResultInfo
resInfo
;
char
*
buf
;
SArray
*
pDistinctDataInfo
;
}
SDistinctOperatorInfo
;
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
int32_t
initAggInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
int32_t
numOfRows
,
SSDataBlock
*
pResultBlock
,
const
char
*
pkey
);
void
toSDatablock
(
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
,
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
,
int32_t
*
rowCellOffset
);
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf_rv
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SExecTaskInfo
*
pTaskInfo
,
SAggSupporter
*
pAggSup
);
void
doDestroyBasicInfo
(
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
);
int32_t
setSDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
SLoadRemoteDataInfo
*
pLoadInfo
,
int32_t
numOfRows
,
char
*
pData
,
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfCols
,
int32_t
repeatTime
,
int32_t
reverseTime
,
SArray
*
pColMatchInfo
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -645,14 +674,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SArray
*
pGroupColList
,
S
Node
*
pCondition
,
S
ExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDistinctOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createDistinctOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
pTsdbReadHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
...
...
@@ -700,6 +729,7 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SExplainExecInfo
**
pRes
,
int32_t
*
capacity
,
int32_t
*
resNum
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
aee4c7a2
...
...
@@ -229,3 +229,12 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
int32_t
qGetExplainExecInfo
(
qTaskInfo_t
tinfo
,
int32_t
*
resNum
,
SExplainExecInfo
**
pRes
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
capacity
=
0
;
return
getOperatorExplainExecInfo
(
pTaskInfo
->
pRoot
,
pRes
,
&
capacity
,
resNum
);
}
source/libs/executor/src/executorimpl.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/executor/src/groupoperator.c
0 → 100644
浏览文件 @
aee4c7a2
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
static
void
destroyGroupbyOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
(
SGroupbyOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
taosMemoryFreeClear
(
pInfo
->
keyBuf
);
taosArrayDestroy
(
pInfo
->
pGroupCols
);
taosArrayDestroy
(
pInfo
->
pGroupColVals
);
}
static
int32_t
initGroupOptrInfo
(
SGroupbyOperatorInfo
*
pInfo
,
SArray
*
pGroupColList
)
{
pInfo
->
pGroupColVals
=
taosArrayInit
(
4
,
sizeof
(
SGroupKeys
));
if
(
pInfo
->
pGroupColVals
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pGroupColList
,
i
);
pInfo
->
groupKeyLen
+=
pCol
->
bytes
;
struct
SGroupKeys
key
=
{
0
};
key
.
bytes
=
pCol
->
bytes
;
key
.
type
=
pCol
->
type
;
key
.
isNull
=
false
;
key
.
pData
=
taosMemoryCalloc
(
1
,
pCol
->
bytes
);
if
(
key
.
pData
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosArrayPush
(
pInfo
->
pGroupColVals
,
&
key
);
}
int32_t
nullFlagSize
=
sizeof
(
int8_t
)
*
numOfGroupCols
;
pInfo
->
keyBuf
=
taosMemoryCalloc
(
1
,
pInfo
->
groupKeyLen
+
nullFlagSize
);
if
(
pInfo
->
keyBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
static
bool
groupKeyCompare
(
SGroupbyOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
,
int32_t
numOfGroupCols
)
{
SColumnDataAgg
*
pColAgg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pInfo
->
pGroupCols
,
i
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pCol
->
slotId
);
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
pColAgg
=
&
pBlock
->
pBlockAgg
[
pCol
->
slotId
];
// TODO is agg data matched?
}
bool
isNull
=
colDataIsNull
(
pColInfoData
,
pBlock
->
info
.
rows
,
rowIndex
,
pColAgg
);
SGroupKeys
*
pkey
=
taosArrayGet
(
pInfo
->
pGroupColVals
,
i
);
if
(
pkey
->
isNull
&&
isNull
)
{
continue
;
}
if
(
isNull
||
pkey
->
isNull
)
{
return
false
;
}
char
*
val
=
colDataGetData
(
pColInfoData
,
rowIndex
);
if
(
IS_VAR_DATA_TYPE
(
pkey
->
type
))
{
int32_t
len
=
varDataLen
(
val
);
if
(
len
==
varDataLen
(
pkey
->
pData
)
&&
memcmp
(
varDataVal
(
pkey
->
pData
),
varDataVal
(
val
),
len
)
==
0
)
{
continue
;
}
else
{
return
false
;
}
}
else
{
if
(
memcmp
(
pkey
->
pData
,
val
,
pkey
->
bytes
)
!=
0
)
{
return
false
;
}
}
}
return
true
;
}
static
void
keepGroupKeys
(
SGroupbyOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
,
int32_t
numOfGroupCols
)
{
SColumnDataAgg
*
pColAgg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pInfo
->
pGroupCols
,
i
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pCol
->
slotId
);
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
pColAgg
=
&
pBlock
->
pBlockAgg
[
pCol
->
slotId
];
// TODO is agg data matched?
}
SGroupKeys
*
pkey
=
taosArrayGet
(
pInfo
->
pGroupColVals
,
i
);
if
(
colDataIsNull
(
pColInfoData
,
pBlock
->
info
.
rows
,
rowIndex
,
pColAgg
))
{
pkey
->
isNull
=
true
;
}
else
{
char
*
val
=
colDataGetData
(
pColInfoData
,
rowIndex
);
if
(
IS_VAR_DATA_TYPE
(
pkey
->
type
))
{
memcpy
(
pkey
->
pData
,
val
,
varDataTLen
(
val
));
}
else
{
memcpy
(
pkey
->
pData
,
val
,
pkey
->
bytes
);
}
}
}
}
static
int32_t
generatedHashKey
(
void
*
pKey
,
int32_t
*
length
,
SArray
*
pGroupColVals
)
{
ASSERT
(
pKey
!=
NULL
);
size_t
numOfGroupCols
=
taosArrayGetSize
(
pGroupColVals
);
char
*
isNull
=
(
char
*
)
pKey
;
char
*
pStart
=
(
char
*
)
pKey
+
sizeof
(
int8_t
)
*
numOfGroupCols
;
for
(
int32_t
i
=
0
;
i
<
numOfGroupCols
;
++
i
)
{
SGroupKeys
*
pkey
=
taosArrayGet
(
pGroupColVals
,
i
);
if
(
pkey
->
isNull
)
{
isNull
[
i
]
=
1
;
continue
;
}
isNull
[
i
]
=
0
;
if
(
IS_VAR_DATA_TYPE
(
pkey
->
type
))
{
varDataCopy
(
pStart
,
pkey
->
pData
);
pStart
+=
varDataTLen
(
pkey
->
pData
);
ASSERT
(
varDataTLen
(
pkey
->
pData
)
<=
pkey
->
bytes
);
}
else
{
memcpy
(
pStart
,
pkey
->
pData
,
pkey
->
bytes
);
pStart
+=
pkey
->
bytes
;
}
}
*
length
=
(
pStart
-
(
char
*
)
pKey
);
return
0
;
}
// assign the group keys or user input constant values if required
static
void
doAssignGroupKeys
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
totalRows
,
int32_t
rowIndex
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
pCtx
[
i
].
functionId
==
-
1
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
&
pCtx
[
i
]);
SColumnInfoData
*
pColInfoData
=
pCtx
[
i
].
input
.
pData
[
0
];
if
(
!
colDataIsNull
(
pColInfoData
,
totalRows
,
rowIndex
,
NULL
))
{
char
*
dest
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
char
*
data
=
colDataGetData
(
pColInfoData
,
rowIndex
);
// set result exists, todo refactor
memcpy
(
dest
,
data
,
pColInfoData
->
info
.
bytes
);
pEntryInfo
->
hasResult
=
DATA_SET_FLAG
;
pEntryInfo
->
numOfRes
=
1
;
}
}
}
}
static
void
doHashGroupbyAgg
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
int32_t
numOfGroupCols
=
taosArrayGetSize
(
pInfo
->
pGroupCols
);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
// return;
// }
int32_t
len
=
0
;
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
int32_t
num
=
0
;
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if
(
!
pInfo
->
isInit
)
{
keepGroupKeys
(
pInfo
,
pBlock
,
j
,
numOfGroupCols
);
pInfo
->
isInit
=
true
;
num
++
;
continue
;
}
bool
equal
=
groupKeyCompare
(
pInfo
,
pBlock
,
j
,
numOfGroupCols
);
if
(
equal
)
{
num
++
;
continue
;
}
/*int32_t ret = */
generatedHashKey
(
pInfo
->
keyBuf
,
&
len
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
int32_t
rowIndex
=
j
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
// assign the group keys or user input constant values if required
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfOutput
,
pBlock
->
info
.
rows
,
rowIndex
);
keepGroupKeys
(
pInfo
,
pBlock
,
j
,
numOfGroupCols
);
num
=
1
;
}
if
(
num
>
0
)
{
/*int32_t ret = */
generatedHashKey
(
pInfo
->
keyBuf
,
&
len
,
pInfo
->
pGroupColVals
);
int32_t
ret
=
setGroupResultOutputBuf_rv
(
&
(
pInfo
->
binfo
),
pOperator
->
numOfOutput
,
pInfo
->
keyBuf
,
TSDB_DATA_TYPE_VARCHAR
,
len
,
0
,
pInfo
->
aggSup
.
pResultBuf
,
pTaskInfo
,
&
pInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
int32_t
rowIndex
=
pBlock
->
info
.
rows
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfOutput
,
TSDB_ORDER_ASC
);
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfOutput
,
pBlock
->
info
.
rows
,
rowIndex
);
}
}
static
SSDataBlock
*
hashGroupbyAggregate
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
toSDatablock
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
,
pRes
,
pInfo
->
binfo
.
capacity
,
pInfo
->
binfo
.
rowCellInfoOffset
);
if
(
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
pRes
;
}
int32_t
order
=
TSDB_ORDER_ASC
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
getNextFn
(
downstream
,
newgroup
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
doHashGroupbyAgg
(
pOperator
,
pBlock
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset);
// }
blockDataEnsureCapacity
(
pRes
,
pInfo
->
binfo
.
capacity
);
initGroupResInfo
(
&
pInfo
->
groupResInfo
,
&
pInfo
->
binfo
.
resultRowInfo
);
while
(
1
)
{
toSDatablock
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
,
pRes
,
pInfo
->
binfo
.
capacity
,
pInfo
->
binfo
.
rowCellInfoOffset
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
bool
hasRemain
=
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
break
;
}
if
(
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
return
pInfo
->
binfo
.
pRes
;
}
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
)
{
SGroupbyOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pGroupCols
=
pGroupColList
;
pInfo
->
pCondition
=
pCondition
;
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
4096
,
pResultBlock
,
pTaskInfo
->
id
.
str
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
int32_t
code
=
initGroupOptrInfo
(
pInfo
,
pGroupColList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = OP_Groupby;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
_openFn
=
operatorDummyOpenFn
;
pOperator
->
getNextFn
=
hashGroupbyAggregate
;
pOperator
->
closeFn
=
destroyGroupbyOperatorInfo
;
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
#define MULTI_KEY_DELIM "-"
static
void
destroyDistinctOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SDistinctOperatorInfo
*
pInfo
=
(
SDistinctOperatorInfo
*
)
param
;
taosHashCleanup
(
pInfo
->
pSet
);
taosMemoryFreeClear
(
pInfo
->
buf
);
taosArrayDestroy
(
pInfo
->
pDistinctDataInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
static
void
buildMultiDistinctKey
(
SDistinctOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
rowId
)
{
char
*
p
=
pInfo
->
buf
;
// memset(p, 0, pInfo->totalBytes);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
);
i
++
)
{
SDistinctDataInfo
*
pDistDataInfo
=
(
SDistinctDataInfo
*
)
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
i
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pDistDataInfo
->
index
);
char
*
val
=
((
char
*
)
pColDataInfo
->
pData
)
+
pColDataInfo
->
info
.
bytes
*
rowId
;
if
(
isNull
(
val
,
pDistDataInfo
->
type
))
{
p
+=
pDistDataInfo
->
bytes
;
continue
;
}
if
(
IS_VAR_DATA_TYPE
(
pDistDataInfo
->
type
))
{
memcpy
(
p
,
varDataVal
(
val
),
varDataLen
(
val
));
p
+=
varDataLen
(
val
);
}
else
{
memcpy
(
p
,
val
,
pDistDataInfo
->
bytes
);
p
+=
pDistDataInfo
->
bytes
;
}
memcpy
(
p
,
MULTI_KEY_DELIM
,
strlen
(
MULTI_KEY_DELIM
));
p
+=
strlen
(
MULTI_KEY_DELIM
);
}
}
static
bool
initMultiDistinctInfo
(
SDistinctOperatorInfo
*
pInfo
,
SOperatorInfo
*
pOperator
)
{
for
(
int
i
=
0
;
i
<
pOperator
->
numOfOutput
;
i
++
)
{
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
}
#if 0
for (int i = 0; i < pOperator->numOfOutput; i++) {
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
assert(i < numOfCols);
for (int j = 0; j < numOfCols; j++) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
}
}
}
#endif
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
return
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
)
==
pOperator
->
numOfOutput
?
true
:
false
;
}
static
SSDataBlock
*
hashDistinct
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SDistinctOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
pRes
->
info
.
rows
=
0
;
SSDataBlock
*
pBlock
=
NULL
;
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pDownstream
->
getNextFn
(
pDownstream
,
newgroup
);
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
// ensure result output buf
if
(
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
>
pInfo
->
resInfo
.
capacity
)
{
int32_t
newSize
=
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pRes
->
pDataBlock
);
i
++
)
{
SColumnInfoData
*
pResultColInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
i
);
SDistinctDataInfo
*
pDistDataInfo
=
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
i
);
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
// if (tmp == NULL) {
// return NULL;
// } else {
// pResultColInfoData->pData = tmp;
// }
}
pInfo
->
resInfo
.
capacity
=
newSize
;
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
buildMultiDistinctKey
(
pInfo
,
pBlock
,
i
);
if
(
taosHashGet
(
pInfo
->
pSet
,
pInfo
->
buf
,
0
)
==
NULL
)
{
taosHashPut
(
pInfo
->
pSet
,
pInfo
->
buf
,
0
,
NULL
,
0
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pRes
->
pDataBlock
);
j
++
)
{
SDistinctDataInfo
*
pDistDataInfo
=
taosArrayGet
(
pInfo
->
pDistinctDataInfo
,
j
);
// distinct meta info
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pDistDataInfo
->
index
);
// src
SColumnInfoData
*
pResultColInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
j
);
// dist
char
*
val
=
((
char
*
)
pColInfoData
->
pData
)
+
pDistDataInfo
->
bytes
*
i
;
char
*
start
=
pResultColInfoData
->
pData
+
pDistDataInfo
->
bytes
*
pInfo
->
pRes
->
info
.
rows
;
memcpy
(
start
,
val
,
pDistDataInfo
->
bytes
);
}
pRes
->
info
.
rows
+=
1
;
}
}
if
(
pRes
->
info
.
rows
>=
pInfo
->
resInfo
.
threshold
)
{
break
;
}
}
return
(
pInfo
->
pRes
->
info
.
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
}
SOperatorInfo
*
createDistinctOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SDistinctOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SDistinctOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pOperator
->
resultInfo
.
capacity
=
4096
;
// todo extract function.
// pInfo->totalBytes = 0;
pInfo
->
buf
=
NULL
;
pInfo
->
pDistinctDataInfo
=
taosArrayInit
(
numOfCols
,
sizeof
(
SDistinctDataInfo
));
initMultiDistinctInfo
(
pInfo
,
pOperator
);
pInfo
->
pSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = DISTINCT;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
getNextFn
=
hashDistinct
;
pOperator
->
closeFn
=
destroyDistinctOperatorInfo
;
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
return
NULL
;
}
source/libs/executor/src/scanoperator.c
0 → 100644
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/executor/src/tsimplehash.c
浏览文件 @
aee4c7a2
...
...
@@ -37,7 +37,7 @@ typedef struct SHNode {
char
data
[];
}
SHNode
;
typedef
struct
SSHashObj
{
struct
SSHashObj
{
SHNode
**
hashList
;
size_t
capacity
;
// number of slots
int64_t
size
;
// number of elements in hash table
...
...
@@ -45,7 +45,7 @@ typedef struct SSHashObj {
_equal_fn_t
equalFp
;
// equal function
int32_t
keyLen
;
int32_t
dataLen
;
}
SSHashObj
;
};
static
FORCE_INLINE
int32_t
taosHashCapacity
(
int32_t
length
)
{
int32_t
len
=
MIN
(
length
,
HASH_MAX_CAPACITY
);
...
...
@@ -107,7 +107,7 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat
return
pNewNode
;
}
void
taosHashTableResize
(
SSHashObj
*
pHashObj
)
{
static
void
taosHashTableResize
(
SSHashObj
*
pHashObj
)
{
if
(
!
HASH_NEED_RESIZE
(
pHashObj
))
{
return
;
}
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
aee4c7a2
...
...
@@ -26,31 +26,34 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void
functionFinalize
(
SqlFunctionCtx
*
pCtx
);
bool
getCountFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
void
countFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
countFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getSumFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
void
sumFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
);
bool
minFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
maxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
getMinmaxFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
void
minFunction
(
SqlFunctionCtx
*
pCtx
);
void
maxFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getStddevFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
void
stddevFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
stddevFunction
(
SqlFunctionCtx
*
pCtx
);
void
stddevFinalize
(
SqlFunctionCtx
*
pCtx
);
bool
getPercentileFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
void
percentileFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
percentileFunction
(
SqlFunctionCtx
*
pCtx
);
void
percentileFinalize
(
SqlFunctionCtx
*
pCtx
);
bool
get
FirstLast
FuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
void
firstFunction
(
SqlFunctionCtx
*
pCtx
);
void
last
Function
(
SqlFunctionCtx
*
pCtx
);
bool
get
Diff
FuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
diff
Function
(
SqlFunctionCtx
*
pCtx
);
void
valFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getFirstLastFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
lastFunction
(
SqlFunctionCtx
*
pCtx
);
#ifdef __cplusplus
}
...
...
source/libs/function/inc/taggfunction.h
浏览文件 @
aee4c7a2
...
...
@@ -52,8 +52,6 @@ typedef struct SInterpInfoDetail {
int8_t
primaryCol
;
}
SInterpInfoDetail
;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
typedef
struct
STwaInfo
{
int8_t
hasResult
;
// flag to denote has value
double
dOutput
;
...
...
source/libs/function/src/builtins.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/function/src/builtinsimpl.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/function/src/functionMgt.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/function/src/taggfunction.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/nodes/inc/nodesUtil.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/parser/inc/parAst.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/parser/inc/sql.y
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/parser/src/parAstCreater.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/parser/src/parTranslater.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/parser/src/sql.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/planner/src/planLogicCreater.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/planner/test/plannerTest.cpp
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworker.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/inc/sclvector.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/src/filter.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/src/scalar.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/src/sclfunc.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/src/sclvector.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scalar/test/filter/filterTests.cpp
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scheduler/src/scheduler.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/util/src/terror.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/util/src/thash.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
source/util/src/tlockfree.c
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
tests/script/tsim/query/diff.sim
0 → 100644
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
tests/script/tsim/query/session.sim
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
tests/script/tsim/query/stddev.sim
0 → 100644
浏览文件 @
aee4c7a2
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录