Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a4015fdb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a4015fdb
编写于
5月 10, 2022
作者:
H
Haojun Liao
提交者:
GitHub
5月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12315 from taosdata/feature/3.0_liaohj
refactor: do some internal refactor.
上级
f718be6e
119874ac
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
58 addition
and
131 deletion
+58
-131
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-1
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+2
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+7
-6
source/client/src/clientMain.c
source/client/src/clientMain.c
+5
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-16
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+29
-97
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+4
-4
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+1
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+1
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+4
-1
未找到文件。
source/client/inc/clientInt.h
浏览文件 @
a4015fdb
...
...
@@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int
genericRspCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pReqObj
);
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
int32_t
connType
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
int32_t
releaseTscObj
(
int64_t
rid
);
...
...
source/client/src/clientEnv.c
浏览文件 @
a4015fdb
...
...
@@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) {
taosMemoryFreeClear
(
pTscObj
);
}
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
)
{
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
int32_t
connType
,
SAppInstInfo
*
pAppInfo
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STscObj
));
if
(
NULL
==
pObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return
NULL
;
}
pObj
->
connType
=
connType
;
pObj
->
pAppInfo
=
pAppInfo
;
tstrncpy
(
pObj
->
user
,
user
,
sizeof
(
pObj
->
user
));
memcpy
(
pObj
->
pass
,
auth
,
TSDB_PASSWORD_LEN
);
...
...
source/client/src/clientImpl.c
浏览文件 @
a4015fdb
...
...
@@ -25,7 +25,7 @@
#include "tref.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
...
...
@@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
,
int
connType
)
{
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
pAppInfo
);
STscObj
*
pTscObj
=
createTscObj
(
user
,
auth
,
db
,
connType
,
pAppInfo
);
if
(
NULL
==
pTscObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
pTscObj
;
...
...
@@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
NULL
;
}
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
,
connType
);
SMsgSendInfo
*
body
=
buildConnectMsg
(
pRequest
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
transporterId
,
body
);
...
...
@@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return
pTscObj
;
}
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
,
int8_t
connType
)
{
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
pMsgSendInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
}
taosMemoryFreeClear
(
db
);
connectReq
.
connType
=
connType
;
connectReq
.
pid
=
htonl
(
appInfo
.
pid
);
connectReq
.
connType
=
pObj
->
connType
;
connectReq
.
pid
=
htonl
(
appInfo
.
pid
);
connectReq
.
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
connectReq
.
app
,
appInfo
.
appName
,
sizeof
(
connectReq
.
app
));
tstrncpy
(
connectReq
.
user
,
pObj
->
user
,
sizeof
(
connectReq
.
user
));
tstrncpy
(
connectReq
.
passwd
,
pObj
->
pass
,
sizeof
(
connectReq
.
passwd
));
...
...
source/client/src/clientMain.c
浏览文件 @
a4015fdb
...
...
@@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) {
}
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
if
(
taos
==
NULL
||
sql
==
NULL
)
{
// todo directly call fp
}
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
}
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a4015fdb
...
...
@@ -62,11 +62,6 @@ enum {
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED
=
0x2u
,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
TASK_OVER
=
0x4u
,
};
typedef
struct
SResultRowCell
{
...
...
@@ -288,12 +283,6 @@ typedef struct SOperatorInfo {
SOperatorFpSet
fpSet
;
}
SOperatorInfo
;
typedef
struct
{
int32_t
numOfTags
;
int32_t
numOfCols
;
SColumnInfo
*
colList
;
}
SQueriedTableInfo
;
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
...
...
@@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
void
doBuildResultDatablock
(
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
);
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
finalizeMultiTupleQueryResult
(
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
...
...
@@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
#endif
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
...
...
@@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
getMaximumIdleDurationSec
();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
a4015fdb
...
...
@@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
return
isTaskKilled
(
pTaskInfo
)
||
Q_STATUS_EQUAL
(
pTaskInfo
->
status
,
TASK_OVER
)
;
return
isTaskKilled
(
pTaskInfo
);
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a4015fdb
...
...
@@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static
SColumnInfo
*
extractColumnFilterInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
*
numOfFilterCols
);
static
int32_t
setTimestampListJoinInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SVariant
*
pTag
,
STableQueryInfo
*
pTableQueryInfo
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
int32_t
getNumOfScanTimes
(
STaskAttr
*
pQueryAttr
);
...
...
@@ -620,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
continue
;
}
if
(
functionNeedToExecute
(
&
pCtx
[
k
]))
{
if
(
functionNeedToExecute
(
&
pCtx
[
k
])
&&
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
{
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
}
...
...
@@ -803,9 +802,12 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static
void
doAggregateImpl
(
SOperatorInfo
*
pOperator
,
TSKEY
startTs
,
SqlFunctionCtx
*
pCtx
)
{
for
(
int32_t
k
=
0
;
k
<
pOperator
->
numOfExprs
;
++
k
)
{
if
(
functionNeedToExecute
(
&
pCtx
[
k
]))
{
pCtx
[
k
].
startTs
=
startTs
;
// this can be set during create the struct
if
(
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
pCtx
[
k
].
startTs
=
startTs
;
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if
(
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
{
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
}
}
}
}
...
...
@@ -922,6 +924,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
static
void
setResultRowKey
(
SResultRow
*
pResultRow
,
char
*
pData
,
int16_t
type
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
// todo disable this
// if (pResultRow->key == NULL) {
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
// varDataCopy(pResultRow->key, pData);
...
...
@@ -1075,7 +1078,7 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
}
// set the output buffer for the selectivity + tag query
static
int32_t
set
CtxTag
ColumnInfo
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
static
int32_t
set
SelectValue
ColumnInfo
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
int32_t
num
=
0
;
SqlFunctionCtx
*
p
=
NULL
;
...
...
@@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
pValCtx
[
num
++
]
=
&
pCtx
[
i
];
}
else
{
}
else
if
(
fmIsAggFunc
(
pCtx
[
i
].
functionId
))
{
p
=
&
pCtx
[
i
];
}
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
...
...
@@ -1215,7 +1218,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
(
int32_t
)((
*
rowCellInfoOffset
)[
i
-
1
]
+
sizeof
(
SResultRowEntryInfo
)
+
pFuncCtx
[
i
-
1
].
resDataInfo
.
interBufSize
);
}
set
CtxTag
ColumnInfo
(
pFuncCtx
,
numOfOutput
);
set
SelectValue
ColumnInfo
(
pFuncCtx
,
numOfOutput
);
return
pFuncCtx
;
}
...
...
@@ -1451,8 +1454,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in
}
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...
...
@@ -2009,8 +2010,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
// todo merged with the build group result.
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
)
{
void
finalizeMultiTupleQueryResult
(
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRowPosition
*
pPos
=
&
pResultRowInfo
->
pPosition
[
i
];
...
...
@@ -2023,17 +2024,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
// }
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
struct
SResultRowEntryInfo
*
pResInfo
=
pCtx
[
j
].
resultInfo
;
struct
SResultRowEntryInfo
*
pResInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
continue
;
}
if
(
pCtx
[
j
].
fpSet
.
process
)
{
// TODO set the dummy function, to avoid the check for null ptr.
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
if
(
pRow
->
numOfRows
<
pResInfo
->
numOfRes
)
{
pRow
->
numOfRows
=
pResInfo
->
numOfRes
;
}
...
...
@@ -2187,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t
numOfResult
=
pBlock
->
info
.
rows
;
// there are already exists result rows
int32_t
start
=
0
;
int32_t
step
=
-
1
;
int32_t
step
=
1
;
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
assert
(
orderType
==
TSDB_ORDER_ASC
||
orderType
==
TSDB_ORDER_DESC
);
if
(
orderType
==
TSDB_ORDER_ASC
)
{
start
=
pGroupResInfo
->
index
;
step
=
1
;
}
else
{
// desc order copy all data
start
=
numOfRows
-
pGroupResInfo
->
index
-
1
;
step
=
-
1
;
}
for
(
int32_t
i
=
start
;
(
i
<
numOfRows
)
&&
(
i
>=
0
);
i
+=
step
)
{
...
...
@@ -2227,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
...
...
@@ -3682,10 +3678,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pInfo
=
&
pAggInfo
->
binfo
;
int32_t
order
=
TSDB_ORDER_ASC
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
bool
newgroup
=
true
;
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
...
...
@@ -3698,6 +3692,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
int32_t
order
=
getTableScanOrder
(
pOperator
);
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if
(
pAggInfo
->
pScalarExprInfo
!=
NULL
)
{
int32_t
code
=
projectApplyFunctions
(
pAggInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pAggInfo
->
pScalarCtx
,
...
...
@@ -3730,8 +3726,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
p
AggInfo
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pAggInfo
->
aggSup
.
pResultBuf
,
&
pAggInfo
->
binfo
.
resultRowInfo
,
pAggInfo
->
binfo
.
rowCellInfoOffset
);
finalizeMultiTupleQueryResult
(
p
Operator
->
numOfExprs
,
pAggInfo
->
aggSup
.
pResultBuf
,
&
pAggInfo
->
binfo
.
resultRowInfo
,
pAggInfo
->
binfo
.
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
false
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -4014,7 +4010,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
}
// todo dynamic set tags
// todo set tags
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
...
...
@@ -4028,7 +4025,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pTaskInfo
->
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
...
...
@@ -4279,11 +4275,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SArray
*
pa
=
taosArrayGetP
(
pTableGroupInfo
->
pGroupList
,
i
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pa
);
++
j
)
{
STableKeyInfo
*
pk
=
taosArrayGet
(
pa
,
j
);
STableQueryInfo
*
pTQueryInfo
=
&
pTableQueryInfo
[
index
++
];
// pTQueryInfo->uid = pk->uid;
pTQueryInfo
->
lastKey
=
pk
->
lastKey
;
// pTQueryInfo->groupIndex = i;
}
}
...
...
@@ -4302,7 +4295,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto
_error
;
}
int32_t
numOfRows
=
1
;
int32_t
numOfRows
=
1
0
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
numOfRows
);
...
...
@@ -4313,9 +4306,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto
_error
;
}
pOperator
->
resultInfo
.
capacity
=
4096
;
pOperator
->
resultInfo
.
threshold
=
4096
*
0
.
75
;
int32_t
numOfGroup
=
10
;
// todo replaced with true value
pInfo
->
groupId
=
INT32_MIN
;
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
numOfGroup
);
...
...
@@ -4545,42 +4535,6 @@ _error:
return
NULL
;
}
static
int32_t
getColumnIndexInSource
(
SQueriedTableInfo
*
pTableInfo
,
SExprBasicInfo
*
pExpr
,
SColumnInfo
*
pTagCols
)
{
int32_t
j
=
0
;
if
(
TSDB_COL_IS_TAG
(
pExpr
->
pParam
[
0
].
pCol
->
type
))
{
if
(
pExpr
->
pParam
[
0
].
pCol
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
return
TSDB_TBNAME_COLUMN_INDEX
;
}
while
(
j
<
pTableInfo
->
numOfTags
)
{
if
(
pExpr
->
pParam
[
0
].
pCol
->
colId
==
pTagCols
[
j
].
colId
)
{
return
j
;
}
j
+=
1
;
}
}
/*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data
return TSDB_UD_COLUMN_INDEX;
} else {
while (j < pTableInfo->numOfCols) {
if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) {
return j;
}
j += 1;
}
}*/
return
INT32_MIN
;
// return a less than TSDB_TBNAME_COLUMN_INDEX value
}
bool
validateExprColumnInfo
(
SQueriedTableInfo
*
pTableInfo
,
SExprBasicInfo
*
pExpr
,
SColumnInfo
*
pTagCols
)
{
int32_t
j
=
getColumnIndexInSource
(
pTableInfo
,
pExpr
,
pTagCols
);
return
j
!=
INT32_MIN
;
}
static
SResSchema
createResSchema
(
int32_t
type
,
int32_t
bytes
,
int32_t
slotId
,
int32_t
scale
,
int32_t
precision
,
const
char
*
name
)
{
SResSchema
s
=
{
0
};
...
...
@@ -4739,7 +4693,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
static
void
setJoinColumnInfo
(
SColumnInfo
*
p
Info
,
const
SColumnNode
*
pLeft
Node
);
static
void
setJoinColumnInfo
(
SColumnInfo
*
p
Column
,
const
SColumnNode
*
pColumn
Node
);
static
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
SInterval
interval
=
{
...
...
@@ -5240,28 +5194,6 @@ _complete:
return
code
;
}
static
int32_t
updateOutputBufForTopBotQuery
(
SQueriedTableInfo
*
pTableInfo
,
SColumnInfo
*
pTagCols
,
SExprInfo
*
pExprs
,
int32_t
numOfOutput
,
int32_t
tagLen
,
bool
superTable
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
int16_t
functId
=
getExprFunctionId
(
&
pExprs
[
i
]);
if
(
functId
==
FUNCTION_TOP
||
functId
==
FUNCTION_BOTTOM
)
{
int32_t
j
=
getColumnIndexInSource
(
pTableInfo
,
&
pExprs
[
i
].
base
,
pTagCols
);
if
(
j
<
0
||
j
>=
pTableInfo
->
numOfCols
)
{
return
TSDB_CODE_QRY_INVALID_MSG
;
}
else
{
SColumnInfo
*
pCol
=
&
pTableInfo
->
colList
[
j
];
// int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i,
// &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes,
// &pExprs[i].base.interBytes, tagLen, superTable, NULL);
// assert(ret == TSDB_CODE_SUCCESS);
}
}
}
return
TSDB_CODE_SUCCESS
;
}
void
setResultBufSize
(
STaskAttr
*
pQueryAttr
,
SResultInfo
*
pResultInfo
)
{
const
int32_t
DEFAULT_RESULT_MSG_SIZE
=
1024
*
(
1024
+
512
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
a4015fdb
...
...
@@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
p
Info
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
finalizeMultiTupleQueryResult
(
p
Operator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a4015fdb
...
...
@@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
finalizeMultiTupleQueryResult
(
p
Info
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
finalizeMultiTupleQueryResult
(
p
Operator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
true
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
finalizeMultiTupleQueryResult
(
p
BInfo
->
pCtx
,
p
Operator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
true
);
...
...
@@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
finalizeMultiTupleQueryResult
(
p
BInfo
->
pCtx
,
p
Operator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
finalizeMultiTupleQueryResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
rowCellInfoOffset
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
true
);
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
a4015fdb
...
...
@@ -25,6 +25,7 @@ extern "C" {
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
functionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
dummyProcess
(
SqlFunctionCtx
*
UNUSED_PARAM
(
pCtx
));
int32_t
functionFinalizeWithResultBuf
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
char
*
finalResult
);
EFuncDataRequired
countDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
a4015fdb
...
...
@@ -1074,7 +1074,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateSelectValue
,
.
getEnvFunc
=
getSelectivityFuncEnv
,
// todo remove this function later.
.
initFunc
=
functionSetup
,
.
sprocessFunc
=
NULL
,
.
processFunc
=
NULL
,
.
finalizeFunc
=
NULL
}
};
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
a4015fdb
...
...
@@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
pResInfo
->
isNullRes
=
(
pResInfo
->
numOfRes
==
0
)
?
1
:
0
;
/*cleanupResultRowEntry(pResInfo);*/
char
*
in
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
colDataAppend
(
pCol
,
pBlock
->
info
.
rows
,
in
,
pResInfo
->
isNullRes
);
...
...
@@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return
pResInfo
->
numOfRes
;
}
int32_t
dummyProcess
(
SqlFunctionCtx
*
UNUSED_PARAM
(
pCtx
))
{
return
0
;
}
int32_t
functionFinalizeWithResultBuf
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
char
*
finalResult
)
{
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录