Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
edf9253a
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
edf9253a
编写于
10月 15, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into enh/TD-19463_2
上级
fc090a1a
77cafebd
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
281 addition
and
241 deletion
+281
-241
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+3
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+30
-22
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+61
-23
source/common/src/tglobal.c
source/common/src/tglobal.c
+3
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+1
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+28
-22
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+24
-6
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+1
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+4
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+28
-37
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+1
-1
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-5
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+0
-93
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+2
-3
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+1
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+68
-14
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+7
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+2
-1
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+4
-1
未找到文件。
include/common/tglobal.h
浏览文件 @
edf9253a
...
...
@@ -93,6 +93,7 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
// query client
extern
int32_t
tsQueryPolicy
;
extern
int32_t
tsQueryRspPolicy
;
extern
int32_t
tsQuerySmaOptimize
;
extern
int32_t
tsQueryRsmaTolerance
;
extern
bool
tsQueryPlannerTrace
;
...
...
include/libs/qcom/query.h
浏览文件 @
edf9253a
...
...
@@ -54,6 +54,9 @@ typedef enum {
#define QUERY_POLICY_QNODE 3
#define QUERY_POLICY_CLIENT 4
#define QUERY_RSP_POLICY_DELAY 0
#define QUERY_RSP_POLICY_QUICK 1
typedef
struct
STableComInfo
{
uint8_t
numOfTags
;
// the number of tags in schema
uint8_t
precision
;
// the number of precision
...
...
source/client/src/clientImpl.c
浏览文件 @
edf9253a
...
...
@@ -1036,30 +1036,38 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
pRequest
->
type
=
pQuery
->
msgType
;
SArray
*
pMnodeList
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeLoad
));
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pUser
=
pRequest
->
pTscObj
->
user
,
.
sysInfo
=
pRequest
->
pTscObj
->
sysInfo
,
.
allocatorId
=
pRequest
->
allocatorRefId
};
SQueryPlan
*
pDag
=
NULL
;
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
&
pDag
,
pMnodeList
);
if
(
code
)
{
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
}
else
{
pRequest
->
body
.
subplanNum
=
pDag
->
numOfSubplans
;
}
pRequest
->
metric
.
planEnd
=
taosGetTimestampUs
();
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
pUser
=
pRequest
->
pTscObj
->
user
,
.
sysInfo
=
pRequest
->
pTscObj
->
sysInfo
,
.
allocatorId
=
pRequest
->
allocatorRefId
};
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
SQueryPlan
*
pDag
=
NULL
;
int64_t
st
=
taosGetTimestampUs
();
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
&
pDag
,
pMnodeList
);
if
(
code
)
{
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
}
else
{
pRequest
->
body
.
subplanNum
=
pDag
->
numOfSubplans
;
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
pRequest
->
validateOnly
)
{
SArray
*
pNodeList
=
NULL
;
buildAsyncExecNodeList
(
pRequest
,
&
pNodeList
,
pMnodeList
,
pResultMeta
);
pRequest
->
metric
.
planEnd
=
taosGetTimestampUs
();
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
" create query plan success, elapsed time:%.2f ms, 0x%"
PRIx64
,
pRequest
->
self
,
(
pRequest
->
metric
.
planEnd
-
st
)
/
1000
.
0
,
pRequest
->
requestId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
pRequest
->
validateOnly
)
{
SArray
*
pNodeList
=
NULL
;
buildAsyncExecNodeList
(
pRequest
,
&
pNodeList
,
pMnodeList
,
pResultMeta
);
SRequestConnInfo
conn
=
{.
pTrans
=
getAppInfo
(
pRequest
)
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
...
...
source/client/test/clientTests.cpp
浏览文件 @
edf9253a
...
...
@@ -52,15 +52,15 @@ void printResult(TAOS_RES* pRes) {
int32_t
n
=
0
;
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
*
length
=
taos_fetch_lengths
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
numOfFields
;
++
i
)
{
printf
(
"(%d):%d "
,
i
,
length
[
i
]);
}
printf
(
"
\n
"
);
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
memset
(
str
,
0
,
sizeof
(
str
));
//
int32_t* length = taos_fetch_lengths(pRes);
// for
(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d "
, i, length[i]);
//
}
//
printf("\n");
//
//
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
//
printf("%s\n", str);
//
memset(str, 0, sizeof(str));
}
}
...
...
@@ -102,17 +102,6 @@ void queryCallback(void* param, void* res, int32_t code) {
taos_fetch_raw_block_a
(
res
,
fetchCallback
,
param
);
}
void
queryCallback1
(
void
*
param
,
void
*
res
,
int32_t
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to execute, reason:%s
\n
"
,
taos_errstr
(
res
));
}
taos_free_result
(
res
);
printf
(
"exec query:
\n
"
);
taos_query_a
(
param
,
"select * from tm1"
,
queryCallback
,
param
);
}
void
createNewTable
(
TAOS
*
pConn
,
int32_t
index
)
{
char
str
[
1024
]
=
{
0
};
sprintf
(
str
,
"create table tu%d using st2 tags(%d)"
,
index
,
index
);
...
...
@@ -123,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
328
0
;
i
+=
20
)
{
for
(
int32_t
i
=
0
;
i
<
1000
0
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
...
@@ -141,10 +130,49 @@ void createNewTable(TAOS* pConn, int32_t index) {
taos_free_result
(
p
);
}
}
void
*
queryThread
(
void
*
arg
)
{
TAOS
*
pConn
=
taos_connect
(
"192.168.0.209"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
printf
(
"failed to connect to db, reason:%s"
,
taos_errstr
(
pConn
));
return
NULL
;
}
int64_t
el
=
0
;
for
(
int32_t
i
=
0
;
i
<
5000000
;
++
i
)
{
int64_t
st
=
taosGetTimestampUs
();
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"SELECT _wstart as ts,max(usage_user) FROM benchmarkcpu.host_49 WHERE ts >= 1451618560000 AND ts < 1451622160000 INTERVAL(1m) ;"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
else
{
printResult
(
pRes
);
}
taos_free_result
(
pRes
);
el
+=
(
taosGetTimestampUs
()
-
st
);
if
(
i
%
1000
==
0
&&
i
!=
0
)
{
printf
(
"total:%d, avg time:%.2fms
\n
"
,
i
,
el
/
(
double
)(
i
*
1000
));
}
}
taos_close
(
pConn
);
return
NULL
;
}
static
int32_t
numOfThreads
=
1
;
}
// namespace
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
if
(
argc
>
1
)
{
numOfThreads
=
atoi
(
argv
[
1
]);
}
numOfThreads
=
TMAX
(
numOfThreads
,
1
);
printf
(
"the runing threads is:%d"
,
numOfThreads
);
return
RUN_ALL_TESTS
();
}
...
...
@@ -664,7 +692,6 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
...
@@ -676,13 +703,14 @@ TEST(testCase, projection_query_tables) {
// }
// taos_free_result(pRes);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use
abc1
"
);
TAOS_RES* pRes = taos_query(pConn, "use
benchmarkcpu
");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
...
...
@@ -722,6 +750,16 @@ TEST(testCase, projection_query_tables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST
(
testCase
,
tsbs_perf_test
)
{
TdThread
qid
[
20
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
taosThreadCreate
(
&
qid
[
i
],
NULL
,
queryThread
,
NULL
);
}
getchar
();
}
#if 0
TEST(testCase, projection_query_stables) {
...
...
source/common/src/tglobal.c
浏览文件 @
edf9253a
...
...
@@ -90,6 +90,7 @@ bool tsSmlDataFormat = false;
// query
int32_t
tsQueryPolicy
=
1
;
int32_t
tsQueryRspPolicy
=
0
;
int32_t
tsQuerySmaOptimize
=
0
;
int32_t
tsQueryRsmaTolerance
=
1000
;
// the tolerance time (ms) to judge from which level to query rsma data.
bool
tsQueryPlannerTrace
=
false
;
...
...
@@ -350,6 +351,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"countAlwaysReturnValue"
,
tsCountAlwaysReturnValue
,
0
,
1
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"queryBufferSize"
,
tsQueryBufferSize
,
-
1
,
500000000000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"printAuth"
,
tsPrintAuth
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"queryRspPolicy"
,
tsQueryRspPolicy
,
0
,
1
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"multiProcess"
,
tsMultiProcess
,
0
,
2
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"mnodeShmSize"
,
tsMnodeShmSize
,
TSDB_MAX_MSG_SIZE
*
2
+
1024
,
INT32_MAX
,
0
)
!=
0
)
return
-
1
;
...
...
@@ -728,6 +730,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMonitorPort
=
(
uint16_t
)
cfgGetItem
(
pCfg
,
"monitorPort"
)
->
i32
;
tsMonitorMaxLogs
=
cfgGetItem
(
pCfg
,
"monitorMaxLogs"
)
->
i32
;
tsMonitorComp
=
cfgGetItem
(
pCfg
,
"monitorComp"
)
->
bval
;
tsQueryRspPolicy
=
cfgGetItem
(
pCfg
,
"queryRspPolicy"
)
->
i32
;
tsEnableTelem
=
cfgGetItem
(
pCfg
,
"telemetryReporting"
)
->
bval
;
tsTelemInterval
=
cfgGetItem
(
pCfg
,
"telemetryInterval"
)
->
i32
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
edf9253a
...
...
@@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mndPostProcessQueryMsg
(
pMsg
);
}
dGTrace
(
"msg:%p
, is freed, code:0x%x"
,
pMsg
,
code
);
dGTrace
(
"msg:%p
is freed, code:%s"
,
pMsg
,
tstrerror
(
code
)
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
edf9253a
...
...
@@ -543,6 +543,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
pMTree
->
pLoadInfo
=
pBlockLoadInfo
;
pMTree
->
destroyLoadInfo
=
destroyLoadInfo
;
ASSERT
(
pMTree
->
pLoadInfo
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
pFReader
->
pSet
->
nSttF
;
++
i
)
{
// open all last file
struct
SLDataIter
*
pIter
=
NULL
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
edf9253a
...
...
@@ -3395,19 +3395,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto
_err
;
}
code
=
tsdbTakeReadSnap
(
pReader
->
pTsdb
,
&
pReader
->
pReadSnap
,
pReader
->
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_CONTAINED
)
{
code
=
doOpenReaderImpl
(
pReader
);
if
(
numOfTables
>
0
)
{
code
=
tsdbTakeReadSnap
(
pReader
->
pTsdb
,
&
pReader
->
pReadSnap
,
pReader
->
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_err
;
}
}
else
{
STsdbReader
*
pPrevReader
=
pReader
->
innerReader
[
0
];
STsdbReader
*
pNextReader
=
pReader
->
innerReader
[
1
];
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_CONTAINED
)
{
code
=
doOpenReaderImpl
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
else
{
STsdbReader
*
pPrevReader
=
pReader
->
innerReader
[
0
];
STsdbReader
*
pNextReader
=
pReader
->
innerReader
[
1
];
// we need only one row
pPrevReader
->
capacity
=
1
;
...
...
@@ -3422,19 +3423,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
pNextReader
->
pMemSchema
=
pReader
->
pMemSchema
;
pNextReader
->
pReadSnap
=
pReader
->
pReadSnap
;
code
=
doOpenReaderImpl
(
pPrevReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
code
=
doOpenReaderImpl
(
pPrevReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
code
=
doOpenReaderImpl
(
pNextReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
code
=
doOpenReaderImpl
(
pNextReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
code
=
doOpenReaderImpl
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
code
=
doOpenReaderImpl
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
...
...
@@ -3513,6 +3515,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree
(
pLReader
);
}
if
(
pReader
->
innerReader
[
0
]
!=
0
)
{
tsdbUntakeReadSnap
(
pReader
->
innerReader
[
0
]
->
pTsdb
,
pReader
->
innerReader
[
0
]
->
pReadSnap
,
pReader
->
idStr
);
}
tsdbDebug
(
"%p :io-cost summary: head-file:%"
PRIu64
", head-file time:%.2f ms, SMA:%"
PRId64
" SMA-time:%.2f ms, fileBlocks:%"
PRId64
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
edf9253a
...
...
@@ -212,6 +212,7 @@ typedef struct SExprSupp {
int32_t
numOfExprs
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pCtx
;
int32_t
*
rowEntryInfoOffset
;
// offset value for each row result cell info
SFilterInfo
*
pFilterInfo
;
}
SExprSupp
;
typedef
struct
SOperatorInfo
{
...
...
@@ -926,7 +927,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
edf9253a
...
...
@@ -1113,15 +1113,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
SColumnInfoData
*
p
,
bool
keep
,
int32_t
status
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
)
{
if
(
pFilterNode
==
NULL
||
pBlock
->
info
.
rows
==
0
)
{
return
;
}
SFilterInfo
*
filter
=
NULL
;
SFilterInfo
*
filter
=
pFilterInfo
;
int64_t
st
=
taosGetTimestampUs
();
// pError("start filter");
// todo move to the initialization function
int32_t
code
=
filterInitFromNode
((
SNode
*
)
pFilterNode
,
&
filter
,
0
);
int32_t
code
=
0
;
bool
needFree
=
false
;
if
(
filter
==
NULL
)
{
needFree
=
true
;
code
=
filterInitFromNode
((
SNode
*
)
pFilterNode
,
&
filter
,
0
);
}
SFilterColumnParam
param1
=
{.
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
),
.
pDataBlock
=
pBlock
->
pDataBlock
};
code
=
filterSetDataFromSlotId
(
filter
,
&
param1
);
...
...
@@ -1130,7 +1139,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
// todo the keep seems never to be True??
bool
keep
=
filterExecute
(
filter
,
pBlock
,
&
p
,
NULL
,
param1
.
numOfCols
,
&
status
);
filterFreeInfo
(
filter
);
if
(
needFree
)
{
filterFreeInfo
(
filter
);
}
extractQualifiedTupleByFilterResult
(
pBlock
,
p
,
keep
,
status
);
...
...
@@ -2479,7 +2491,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
pInfo
,
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
doFilter
(
pAggInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
if
(
!
hasRemainResults
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
@@ -2873,7 +2885,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break
;
}
doFilter
(
pInfo
->
pCondition
,
fillResult
,
pInfo
->
pColMatchColInfo
);
doFilter
(
pInfo
->
pCondition
,
fillResult
,
pInfo
->
pColMatchColInfo
,
NULL
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
...
...
@@ -3049,6 +3061,12 @@ void cleanupExprSupp(SExprSupp* pSupp) {
destroyExprInfo
(
pSupp
->
pExprInfo
,
pSupp
->
numOfExprs
);
taosMemoryFreeClear
(
pSupp
->
pExprInfo
);
}
if
(
pSupp
->
pFilterInfo
!=
NULL
)
{
filterFreeInfo
(
pSupp
->
pFilterInfo
);
pSupp
->
pFilterInfo
=
NULL
;
}
taosMemoryFree
(
pSupp
->
rowEntryInfoOffset
);
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
edf9253a
...
...
@@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
NULL
,
NULL
);
if
(
!
hasRemainResults
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
edf9253a
...
...
@@ -387,7 +387,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break
;
}
if
(
pJoinInfo
->
pCondAfterMerge
!=
NULL
)
{
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
,
NULL
);
doFilter
(
pJoinInfo
->
pCondAfterMerge
,
pRes
,
NULL
,
NULL
);
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
edf9253a
...
...
@@ -315,7 +315,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
// do apply filter
doFilter
(
pProjectInfo
->
pFilterNode
,
pFinalRes
,
NULL
);
doFilter
(
pProjectInfo
->
pFilterNode
,
pFinalRes
,
NULL
,
NULL
);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if
(
pFinalRes
->
info
.
rows
>
0
||
(
pOperator
->
status
==
OP_EXEC_DONE
))
{
...
...
@@ -325,7 +325,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
else
{
// do apply filter
if
(
pRes
->
info
.
rows
>
0
)
{
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
,
NULL
);
if
(
pRes
->
info
.
rows
==
0
)
{
continue
;
}
...
...
@@ -518,7 +518,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
if
(
rows
>
0
||
pOperator
->
status
==
OP_EXEC_DONE
)
{
break
;
...
...
@@ -620,7 +620,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
}
pRes
->
info
.
rows
=
1
;
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
,
NULL
);
/*int32_t status = */
doIngroupLimitOffset
(
&
pProjectInfo
->
limitInfo
,
0
,
pRes
,
pOperator
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
edf9253a
...
...
@@ -284,6 +284,18 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return
true
;
}
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -313,6 +325,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
skipBlocks
+=
1
;
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
...
...
@@ -320,6 +333,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
loadSMA
=
true
;
// mark the operation of load sma;
bool
success
=
doLoadBlockSMA
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
success
)
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
qDebug
(
"%s data block SMA loaded, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -373,19 +388,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
,
true
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampUs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pOperator
->
exprSupp
.
pFilterInfo
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pTableScanInfo
->
readRecorder
.
filterTime
+=
el
;
...
...
@@ -754,6 +761,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo
->
dataBlockLoadFlag
=
pTableScanNode
->
dataRequired
;
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
pFilterNode
=
pTableScanNode
->
scan
.
node
.
pConditions
;
if
(
pInfo
->
pFilterNode
!=
NULL
)
{
code
=
filterInitFromNode
((
SNode
*
)
pInfo
->
pFilterNode
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
}
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColList
;
pInfo
->
currentGroupId
=
-
1
;
...
...
@@ -1123,7 +1135,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pResult
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pResult
,
NULL
,
NULL
);
if
(
pResult
->
info
.
rows
==
0
)
{
continue
;
}
...
...
@@ -1311,15 +1323,10 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
blockDataEnsureCapacity
(
pResBlock
,
1
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
pTagCalSup
->
numOfExprs
,
NULL
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
1
,
NULL
);
ASSERT
(
pResBlock
->
info
.
rows
==
1
);
// build tagArray
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
/*STagVal tagVal = {*/
/*.cid = 0,*/
/*.type = 0,*/
/*};*/
// build STag
// set STag
...
...
@@ -1474,7 +1481,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
if
(
filter
)
{
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
}
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
...
...
@@ -1896,7 +1903,7 @@ FETCH_NEXT_BLOCK:
}
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
if
(
pBlockInfo
->
rows
>
0
||
pInfo
->
pUpdateDataRes
->
info
.
rows
>
0
)
{
...
...
@@ -2115,9 +2122,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree
(
pStreamScan
->
pPseudoExpr
);
}
cleanupExprSupp
(
&
pStreamScan
->
tbnameCalSup
);
cleanupExprSupp
(
&
pStreamScan
->
tagCalSup
);
updateInfoDestroy
(
pStreamScan
->
pUpdateInfo
);
blockDataDestroy
(
pStreamScan
->
pRes
);
blockDataDestroy
(
pStreamScan
->
pUpdateRes
);
...
...
@@ -2172,19 +2176,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
}
if
(
pTableScanNode
->
pTags
!=
NULL
)
{
int32_t
numOfTags
;
SExprInfo
*
pTagExpr
=
createExprInfo
(
pTableScanNode
->
pTags
,
NULL
,
&
numOfTags
);
if
(
pTagExpr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
initExprSupp
(
&
pInfo
->
tagCalSup
,
pTagExpr
,
numOfTags
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
pInfo
->
pBlockLists
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pInfo
->
pBlockLists
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -2382,7 +2373,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
...
...
@@ -3455,7 +3446,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
NULL
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pTableScanInfo
->
readRecorder
.
filterTime
+=
el
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
edf9253a
...
...
@@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pBlock
,
pInfo
->
pColMatchInfo
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
pInfo
->
pColMatchInfo
,
NULL
);
if
(
blockDataGetNumOfRows
(
pBlock
)
==
0
)
{
continue
;
}
...
...
source/libs/executor/src/tfill.c
浏览文件 @
edf9253a
...
...
@@ -1501,7 +1501,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
doStreamFillImpl
(
pOperator
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
pInfo
->
pColMatchColInfo
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
pInfo
->
pColMatchColInfo
,
NULL
);
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
edf9253a
...
...
@@ -1277,7 +1277,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
,
NULL
);
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1315,7 +1315,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBlock
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
NULL
,
NULL
);
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -2019,7 +2019,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
,
NULL
);
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -2062,7 +2062,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
,
NULL
,
NULL
);
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -5251,7 +5251,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pIaInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
pIaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
pRes
);
doFilter
(
pMiaInfo
->
pCondition
,
pRes
,
NULL
);
doFilter
(
pMiaInfo
->
pCondition
,
pRes
,
NULL
,
NULL
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
break
;
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
edf9253a
...
...
@@ -6169,99 +6169,6 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return
pResInfo
->
numOfRes
;
}
int32_t
interpFunction
(
SqlFunctionCtx
*
pCtx
)
{
#if 0
int32_t fillType = (int32_t) pCtx->param[2].i64;
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
if (pCtx->start.key == pCtx->startTs) {
assert(pCtx->start.key != INT64_MIN);
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
goto interp_success_exit;
} else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
goto interp_success_exit;
}
switch (fillType) {
case TSDB_FILL_NULL:
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
break;
case TSDB_FILL_SET_VALUE:
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
break;
case TSDB_FILL_LINEAR:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = pCtx->start.key, .val = &v1};
SPoint point2 = {.key = pCtx->end.key, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
break;
case TSDB_FILL_PREV:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
break;
case TSDB_FILL_NEXT:
if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
break;
case TSDB_FILL_NONE:
// do nothing
default:
goto interp_exit;
}
interp_success_exit:
*(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs;
INC_INIT_VAL(pCtx, 1);
interp_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->endTs = pCtx->startTs;
#endif
return
TSDB_CODE_SUCCESS
;
}
int32_t
cachedLastRowFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElems
=
0
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
edf9253a
...
...
@@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
EDealRes
scanPathOptHaveNormalColImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
// *((bool*)pContext) =
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
*
((
bool
*
)
pContext
)
=
true
;
*
((
bool
*
)
pContext
)
=
(
COLUMN_TYPE_TAG
!=
((
SColumnNode
*
)
pNode
)
->
colType
&&
COLUMN_TYPE_TBNAME
!=
((
SColumnNode
*
)
pNode
)
->
colType
);
return
*
((
bool
*
)
pContext
)
?
DEAL_RES_END
:
DEAL_RES_IGNORE_CHILD
;
}
return
DEAL_RES_CONTINUE
;
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
edf9253a
...
...
@@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
int32_t
execId
;
int32_t
level
;
bool
queryGotData
;
bool
queryRsped
;
bool
queryEnd
;
bool
queryContinue
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
edf9253a
...
...
@@ -10,6 +10,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "tname.h"
#include "tglobal.h"
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
...
...
@@ -92,6 +93,19 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwSendQueryRsp
(
QW_FPARAMS_DEF
,
int32_t
msgType
,
SQWTaskCtx
*
ctx
,
int32_t
rspCode
,
bool
quickRsp
)
{
if
((
!
quickRsp
)
||
QUERY_RSP_POLICY_QUICK
==
tsQueryRspPolicy
)
{
if
(
!
ctx
->
localExec
)
{
qwBuildAndSendQueryRsp
(
msgType
,
&
ctx
->
ctrlConnInfo
,
rspCode
,
ctx
);
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
rspCode
,
tstrerror
(
rspCode
));
}
ctx
->
queryRsped
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
queryStop
)
{
int32_t
code
=
0
;
bool
qcontinue
=
true
;
...
...
@@ -144,7 +158,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if
(
numOfResBlock
==
0
)
{
QW_TASK_DLOG
(
"qExecTask end with empty res, useconds:%"
PRIu64
,
useconds
);
}
else
{
QW_TASK_DLOG
(
"qExecTask done
"
,
""
);
QW_TASK_DLOG
(
"qExecTask done
, useconds:%"
PRIu64
,
useconds
);
}
dsEndPut
(
sinkHandle
,
useconds
);
...
...
@@ -234,6 +248,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
int32_t
code
=
0
;
SOutputData
output
=
{
0
};
if
(
NULL
==
ctx
->
sinkHandle
)
{
return
TSDB_CODE_SUCCESS
;
}
*
dataLen
=
0
;
while
(
true
)
{
...
...
@@ -407,6 +425,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
);
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed cause of %s, phase:%s"
,
tstrerror
(
ctx
->
rspCode
),
qwPhaseStr
(
phase
));
QW_ERR_JRET
(
ctx
->
rspCode
);
}
if
(
!
ctx
->
queryRsped
)
{
QW_TASK_ELOG
(
"ready msg has not been processed, phase:%s"
,
qwPhaseStr
(
phase
));
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_MSG_ERROR
);
...
...
@@ -419,6 +442,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_DROPPED
);
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed cause of %s, phase:%s"
,
tstrerror
(
ctx
->
rspCode
),
qwPhaseStr
(
phase
));
QW_ERR_JRET
(
ctx
->
rspCode
);
}
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
...
...
@@ -499,21 +527,17 @@ _return:
if
(
TSDB_CODE_SUCCESS
==
code
&&
QW_PHASE_POST_QUERY
==
phase
)
{
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_PART_SUCC
);
ctx
->
queryGotData
=
true
;
}
if
(
QW_PHASE_POST_QUERY
==
phase
&&
ctx
)
{
if
(
!
ctx
->
localExec
)
{
bool
rsped
=
false
;
SQWMsg
qwMsg
=
{.
msgType
=
ctx
->
msgType
,
.
connInfo
=
ctx
->
ctrlConnInfo
};
qwDbgSimulateRedirect
(
&
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
if
(
!
rsped
)
{
qwBuildAndSendQueryRsp
(
input
->
msgType
+
1
,
&
ctx
->
ctrlConnInfo
,
code
,
ctx
);
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
code
,
tstrerror
(
code
));
}
if
(
QW_PHASE_POST_QUERY
==
phase
&&
ctx
&&
!
ctx
->
queryRsped
)
{
bool
rsped
=
false
;
SQWMsg
qwMsg
=
{.
msgType
=
ctx
->
msgType
,
.
connInfo
=
ctx
->
ctrlConnInfo
};
qwDbgSimulateRedirect
(
&
qwMsg
,
ctx
,
&
rsped
);
qwDbgSimulateDead
(
QW_FPARAMS
(),
ctx
,
&
rsped
);
if
(
!
rsped
)
{
qwSendQueryRsp
(
QW_FPARAMS
(),
input
->
msgType
+
1
,
ctx
,
code
,
false
);
}
ctx
->
queryRsped
=
true
;
}
if
(
ctx
)
{
...
...
@@ -551,6 +575,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
ctx
->
phase
=
-
1
;
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_INIT
));
...
...
@@ -604,6 +629,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
qwSendQueryRsp
(
QW_FPARAMS
(),
qwMsg
->
msgType
+
1
,
ctx
,
code
,
true
);
ctx
->
level
=
plan
->
level
;
atomic_store_ptr
(
&
ctx
->
taskHandle
,
pTaskInfo
);
atomic_store_ptr
(
&
ctx
->
sinkHandle
,
sinkHandle
);
...
...
@@ -619,6 +646,31 @@ _return:
input
.
msgType
=
qwMsg
->
msgType
;
code
=
qwHandlePostPhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_POST_QUERY
,
&
input
,
NULL
);
if
(
ctx
!=
NULL
&&
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
SOutputData
sOutput
=
{
0
};
QW_ERR_JRET
(
qwGetQueryResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
if
(
rsp
)
{
bool
qComplete
=
(
DS_BUF_EMPTY
==
sOutput
.
bufStatus
&&
sOutput
.
queryEnd
);
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
,
qComplete
);
if
(
qComplete
)
{
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryEnd
,
true
);
}
qwMsg
->
connInfo
=
ctx
->
dataConnInfo
;
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwBuildAndSendFetchRsp
(
ctx
->
fetchType
,
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
rsp
=
NULL
;
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
}
}
QW_RET
(
TSDB_CODE_SUCCESS
);
}
...
...
@@ -740,7 +792,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked
=
true
;
// RC WARNING
if
(
QW_QUERY_RUNNING
(
ctx
))
{
if
(
-
1
==
ctx
->
phase
||
false
==
ctx
->
queryGotData
)
{
QW_TASK_DLOG_E
(
"task query unfinished"
);
}
else
if
(
QW_QUERY_RUNNING
(
ctx
))
{
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryContinue
,
1
);
}
else
if
(
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryInQueue
))
{
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXEC
);
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
edf9253a
...
...
@@ -913,7 +913,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_RET
(
schEnsureHbConnection
(
pJob
,
pTask
));
//
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_RET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
plan
->
msgType
));
...
...
@@ -993,6 +993,12 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_ERR_JRET
(
schLaunchRemoteTask
(
pJob
,
pTask
));
}
#if 0
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
}
#endif
_return:
if
(
pJob
->
taskNum
>=
SCH_MIN_AYSNC_EXEC_NUM
)
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
edf9253a
...
...
@@ -155,8 +155,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
return
;
}
SCH_JOB_DLOG
(
"start to free job 0x%"
PRIx64
", errCode:0x%x"
,
*
jobId
,
errCode
);
SCH_JOB_DLOG
(
"start to free job 0x%"
PRIx64
", code:%s"
,
*
jobId
,
tstrerror
(
errCode
));
schHandleJobDrop
(
pJob
,
errCode
);
schReleaseJob
(
*
jobId
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
edf9253a
...
...
@@ -597,6 +597,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SCliConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
tDebug
(
"%s conn %p alloc read buf"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transAllocBuffer
(
pBuf
,
buf
);
}
static
void
cliRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
...
@@ -609,7 +610,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
while
(
transReadComplete
(
pBuf
))
{
t
Trace
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
t
Debug
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
if
(
pBuf
->
invalid
)
{
cliHandleExcept
(
conn
);
break
;
...
...
source/libs/transport/src/transComm.c
浏览文件 @
edf9253a
...
...
@@ -203,7 +203,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
}
int
transSetConnOption
(
uv_tcp_t
*
stream
)
{
uv_tcp_nodelay
(
stream
,
1
);
uv_tcp_nodelay
(
stream
,
0
);
int
ret
=
uv_tcp_keepalive
(
stream
,
5
,
60
);
return
ret
;
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
edf9253a
...
...
@@ -332,7 +332,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
if
(
status
==
0
)
{
tTrace
(
"conn %p data already was written on stream"
,
conn
);
if
(
!
transQueueEmpty
(
&
conn
->
srvMsgs
))
{
SSvrMsg
*
msg
=
transQueuePop
(
&
conn
->
srvMsgs
);
SSvrMsg
*
msg
=
transQueuePop
(
&
conn
->
srvMsgs
);
STraceId
*
trace
=
&
msg
->
msg
.
info
.
traceId
;
tGDebug
(
"conn %p write data out"
,
conn
);
destroySmsg
(
msg
);
// send cached data
if
(
!
transQueueEmpty
(
&
conn
->
srvMsgs
))
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录