Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ac260959
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ac260959
编写于
1月 25, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818] update log.
上级
67e62442
变更
13
展开全部
隐藏空白更改
内联
并排
Showing
13 changed file
with
152 addition
and
376 deletion
+152
-376
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+1
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+66
-65
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+17
-10
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+4
-4
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+11
-193
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+31
-86
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+1
-1
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+8
-8
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+8
-3
未找到文件。
include/libs/executor/executor.h
浏览文件 @
ac260959
...
@@ -45,7 +45,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
...
@@ -45,7 +45,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
* @param qId
* @param qId
* @return
* @return
*/
*/
int32_t
qCreateExecTask
(
void
*
readHandle
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
int32_t
qCreateExecTask
(
void
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
/**
/**
* The main task execution function, including query on both table and multiple tables,
* The main task execution function, including query on both table and multiple tables,
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
ac260959
...
@@ -104,8 +104,7 @@ typedef void* tsdbReadHandleT;
...
@@ -104,8 +104,7 @@ typedef void* tsdbReadHandleT;
* @param qinfo query info handle from query processor
* @param qinfo query info handle from query processor
* @return
* @return
*/
*/
tsdbReadHandleT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
tsdbReadHandleT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
void
*
pRef
);
/**
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
ac260959
此差异已折叠。
点击以展开。
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
ac260959
...
@@ -73,8 +73,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -73,8 +73,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tb_uid_t
uid
;
tb_uid_t
uid
;
int32_t
nCols
;
int32_t
nCols
;
int32_t
nTagCols
;
int32_t
nTagCols
;
SSchemaWrapper
*
pSW
;
SSchemaWrapper
*
pSW
=
NULL
;
STableMetaRsp
*
pTbMetaMsg
=
NULL
;
STableMetaRsp
*
pTbMetaMsg
=
NULL
;
SSchema
*
pTagSchema
;
SSchema
*
pTagSchema
;
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
int
msgLen
=
0
;
int
msgLen
=
0
;
...
@@ -145,15 +145,22 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -145,15 +145,22 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
_exit:
_exit:
free
(
pSW
->
pSchema
);
if
(
pSW
!=
NULL
)
{
free
(
pSW
);
tfree
(
pSW
->
pSchema
);
free
(
pTbCfg
->
name
);
tfree
(
pSW
);
free
(
pTbCfg
);
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
free
(
pTbCfg
->
stbCfg
.
pTagSchema
);
}
else
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
kvRowFree
(
pTbCfg
->
ctbCfg
.
pTag
);
}
}
if
(
pTbCfg
)
{
tfree
(
pTbCfg
->
name
);
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
free
(
pTbCfg
->
stbCfg
.
pTagSchema
);
}
else
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
kvRowFree
(
pTbCfg
->
ctbCfg
.
pTag
);
}
tfree
(
pTbCfg
);
}
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
ahandle
=
pMsg
->
ahandle
;
rpcMsg
.
ahandle
=
pMsg
->
ahandle
;
rpcMsg
.
pCont
=
pTbMetaMsg
;
rpcMsg
.
pCont
=
pTbMetaMsg
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
ac260959
...
@@ -38,7 +38,7 @@
...
@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.
queryId
)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.
idstr
)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
ac260959
...
@@ -240,6 +240,7 @@ typedef struct STaskIdInfo {
...
@@ -240,6 +240,7 @@ typedef struct STaskIdInfo {
uint64_t
subplanId
;
uint64_t
subplanId
;
uint64_t
templateId
;
uint64_t
templateId
;
uint64_t
taskId
;
// this is a subplan id
uint64_t
taskId
;
// this is a subplan id
char
*
idstr
;
}
STaskIdInfo
;
}
STaskIdInfo
;
typedef
struct
SExecTaskInfo
{
typedef
struct
SExecTaskInfo
{
...
@@ -660,6 +661,6 @@ int32_t getMaximumIdleDurationSec();
...
@@ -660,6 +661,6 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
,
uint64_t
taskId
);
#endif // TDENGINE_EXECUTORIMPL_H
#endif // TDENGINE_EXECUTORIMPL_H
source/libs/executor/src/executor.c
浏览文件 @
ac260959
...
@@ -50,11 +50,11 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
...
@@ -50,11 +50,11 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
GET_TASKID
(
pTaskInfo
)
);
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
input
,
pTaskInfo
->
id
.
queryId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"
failed to set the stream block data, reqId:0x%"
PRIx64
,
GET_TASKID
(
pTaskInfo
));
qError
(
"
%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
}
else
{
qDebug
(
"
set the stream block successfully, reqId:0x%"
PRIx64
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"
%s set the stream block successfully"
,
GET_TASKID
(
pTaskInfo
));
}
}
return
code
;
return
code
;
...
@@ -81,7 +81,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
...
@@ -81,7 +81,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
}
qTaskInfo_t
pTaskInfo
=
NULL
;
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
terrno
=
code
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
ac260959
...
@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
...
@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
tfree
(
param
->
prevResult
);
tfree
(
param
->
prevResult
);
}
}
int32_t
qCreateExecTask
(
void
*
readHandle
,
int32_t
vgId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qCreateExecTask
(
void
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
...
@@ -140,16 +140,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
...
@@ -140,16 +140,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
threadId
=
taosGetSelfPthreadId
();
// todo: remove it.
if
(
tinfo
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
*
pRes
=
NULL
;
*
pRes
=
NULL
;
int64_t
curOwner
=
0
;
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"
QID:0x%"
PRIx64
"-%p qhandle
is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
qError
(
"
%s-%p execTask
is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
return
pTaskInfo
->
code
;
...
@@ -160,7 +154,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
...
@@ -160,7 +154,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
}
}
if
(
isTaskKilled
(
pTaskInfo
))
{
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"
QID:0x%"
PRIx64
"
it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"
%s
it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -169,12 +163,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
...
@@ -169,12 +163,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
pTaskInfo
->
code
=
ret
;
qDebug
(
"
QID:0x%"
PRIx64
"
query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
qDebug
(
"
%s
query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
return
pTaskInfo
->
code
;
}
}
qDebug
(
"
QID:0x%"
PRIx64
"
query task is launched"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"
%s
query task is launched"
,
GET_TASKID
(
pTaskInfo
));
bool
newgroup
=
false
;
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
...
@@ -193,59 +187,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
...
@@ -193,59 +187,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
pTaskInfo
->
totalRows
+=
current
;
pTaskInfo
->
totalRows
+=
current
;
qDebug
(
"
QID:0x%"
PRIx64
"
task paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
qDebug
(
"
%s
task paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
current
,
pTaskInfo
->
totalRows
,
0
);
GET_TASKID
(
pTaskInfo
),
current
,
pTaskInfo
->
totalRows
,
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
return
pTaskInfo
->
code
;
}
}
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
)
{
qError
(
"QInfo invalid qhandle"
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QID:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
tsRetrieveBlockingModel
)
{
pQInfo
->
rspContext
=
pRspContext
;
tsem_wait
(
&
pQInfo
->
ready
);
*
buildRes
=
true
;
code
=
pQInfo
->
code
;
}
else
{
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STaskAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pthread_mutex_lock
(
&
pQInfo
->
lock
);
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QID:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QID:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
code
=
pQInfo
->
code
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
}
return
code
;
}
void
*
qGetResultRetrieveMsg
(
qTaskInfo_t
qinfo
)
{
void
*
qGetResultRetrieveMsg
(
qTaskInfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
!=
NULL
);
assert
(
pQInfo
!=
NULL
);
...
@@ -260,7 +208,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
...
@@ -260,7 +208,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
}
qDebug
(
"
QID:0x%"
PRIx64
" execTask killed"
,
pTaskInfo
->
id
.
queryId
);
qDebug
(
"
%s execTask killed"
,
GET_TASKID
(
pTaskInfo
)
);
setTaskKilled
(
pTaskInfo
);
setTaskKilled
(
pTaskInfo
);
// Wait for the query executing thread being stopped/
// Wait for the query executing thread being stopped/
...
@@ -279,7 +227,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
...
@@ -279,7 +227,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
}
qDebug
(
"
QID:0x%"
PRIx64
" query async killed"
,
pTaskInfo
->
id
.
queryId
);
qDebug
(
"
%s execTask async killed"
,
GET_TASKID
(
pTaskInfo
)
);
setTaskKilled
(
pTaskInfo
);
setTaskKilled
(
pTaskInfo
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -297,142 +245,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
...
@@ -297,142 +245,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"
QID:0x%"
PRIx64
" execTask completed, numOfRows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pTaskInfo
->
totalRows
);
qDebug
(
"
%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
)
,
pTaskInfo
->
totalRows
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
doDestroyTask
(
pTaskInfo
);
}
}
void
*
qOpenTaskMgmt
(
int32_t
vgId
)
{
const
int32_t
refreshHandleInterval
=
30
;
// every 30 seconds, refresh handle pool
char
cacheName
[
128
]
=
{
0
};
sprintf
(
cacheName
,
"qhandle_%d"
,
vgId
);
STaskMgmt
*
pTaskMgmt
=
calloc
(
1
,
sizeof
(
STaskMgmt
));
if
(
pTaskMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
pTaskMgmt
->
qinfoPool
=
taosCacheInit
(
TSDB_CACHE_PTR_KEY
,
refreshHandleInterval
,
true
,
freeqinfoFn
,
cacheName
);
pTaskMgmt
->
closed
=
false
;
pTaskMgmt
->
vgId
=
vgId
;
pthread_mutex_init
(
&
pTaskMgmt
->
lock
,
NULL
);
qDebug
(
"vgId:%d, open queryTaskMgmt success"
,
vgId
);
return
pTaskMgmt
;
}
void
qTaskMgmtNotifyClosing
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
qInfo
(
"vgId:%d, set querymgmt closed, wait for all queries cancelled"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
true
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
taosCacheRefresh
(
pQueryMgmt
->
qinfoPool
,
taskMgmtKillTaskFn
,
NULL
);
}
void
qQueryMgmtReOpen
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
qInfo
(
"vgId:%d, set querymgmt reopen"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
false
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
}
void
qCleanupTaskMgmt
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
int32_t
vgId
=
pQueryMgmt
->
vgId
;
assert
(
pQueryMgmt
->
closed
);
SCacheObj
*
pqinfoPool
=
pQueryMgmt
->
qinfoPool
;
pQueryMgmt
->
qinfoPool
=
NULL
;
taosCacheCleanup
(
pqinfoPool
);
pthread_mutex_destroy
(
&
pQueryMgmt
->
lock
);
tfree
(
pQueryMgmt
);
qDebug
(
"vgId:%d, queryMgmt cleanup completed"
,
vgId
);
}
void
**
qRegisterTask
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
)
{
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QID:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QID:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
void
**
handle
=
taosCachePut
(
pQueryMgmt
->
qinfoPool
,
&
qId
,
sizeof
(
qId
),
&
qInfo
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
(
getMaximumIdleDurationSec
()
*
1000
));
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
return
handle
;
}
}
void
**
qAcquireTask
(
void
*
pMgmt
,
uint64_t
_key
)
{
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
closed
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
void
**
handle
=
taosCacheAcquireByKey
(
pQueryMgmt
->
qinfoPool
,
&
_key
,
sizeof
(
_key
));
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
else
{
return
handle
;
}
}
void
**
qReleaseTask
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
)
{
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
return
NULL
;
}
taosCacheRelease
(
pQueryMgmt
->
qinfoPool
,
pQInfo
,
freeHandle
);
return
0
;
}
#if 0
#if 0
//kill by qid
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
...
@@ -444,7 +262,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
...
@@ -444,7 +262,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
}
qWarn("
QId:0x%"PRIx64"
be killed(no memory commit).", pQInfo->qId);
qWarn("
%s
be killed(no memory commit).", pQInfo->qId);
setTaskKilled(pQInfo);
setTaskKilled(pQInfo);
// wait query stop
// wait query stop
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ac260959
...
@@ -2432,10 +2432,6 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
...
@@ -2432,10 +2432,6 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
}
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
// if (IS_QUERY_KILLED(pTaskInfo)) {
// return true;
// }
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
// abort current query execution.
if
(
pTaskInfo
->
owner
!=
0
&&
((
taosGetTimestampSec
()
-
pTaskInfo
->
cost
.
start
/
1000
)
>
10
*
getMaximumIdleDurationSec
())
if
(
pTaskInfo
->
owner
!=
0
&&
((
taosGetTimestampSec
()
-
pTaskInfo
->
cost
.
start
/
1000
)
>
10
*
getMaximumIdleDurationSec
())
...
@@ -4441,9 +4437,9 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) {
...
@@ -4441,9 +4437,9 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) {
//
//
// calculateOperatorProfResults(pQInfo);
// calculateOperatorProfResults(pQInfo);
qDebug
(
"
QID:0x%"
PRIx64
"
:cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
qDebug
(
"
%s
:cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
GET_TASKID
(
pTaskInfo
)
,
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
//
//
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
...
@@ -4994,7 +4990,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
...
@@ -4994,7 +4990,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
pResultRowInfo
->
curPos
=
0
;
pResultRowInfo
->
curPos
=
0
;
}
}
qDebug
(
"
QInfo:0x%"
PRIx64
"
start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
qDebug
(
"
%s
start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
}
}
...
@@ -5005,7 +5001,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
...
@@ -5005,7 +5001,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
qDebug
(
"
QInfo:0x%"
PRIx64
"
start to reverse scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
qDebug
(
"
%s
start to reverse scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
if
(
pResultRowInfo
->
size
>
0
)
{
if
(
pResultRowInfo
->
size
>
0
)
{
...
@@ -5170,7 +5166,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
...
@@ -5170,7 +5166,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
epSet
.
port
[
0
]
=
pSource
->
addr
.
epAddr
[
0
].
port
;
epSet
.
port
[
0
]
=
pSource
->
addr
.
epAddr
[
0
].
port
;
tstrncpy
(
epSet
.
fqdn
[
0
],
pSource
->
addr
.
epAddr
[
0
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
0
]));
tstrncpy
(
epSet
.
fqdn
[
0
],
pSource
->
addr
.
epAddr
[
0
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
0
]));
qDebug
(
"
QID:0x%"
PRIx64
"
build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", %d/%"
PRIzu
,
qDebug
(
"
%s
build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
epSet
.
fqdn
[
0
],
pSource
->
taskId
,
pExchangeInfo
->
current
,
totalSources
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
epSet
.
fqdn
[
0
],
pSource
->
taskId
,
pExchangeInfo
->
current
,
totalSources
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
...
@@ -5181,7 +5177,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
...
@@ -5181,7 +5177,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
// send the fetch remote task result reques
// send the fetch remote task result reques
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"
QID:0x%"
PRIx64
"
prepare message %d failed"
,
GET_TASKID
(
pTaskInfo
),
(
int32_t
)
sizeof
(
SMsgSendInfo
));
qError
(
"
%s
prepare message %d failed"
,
GET_TASKID
(
pTaskInfo
),
(
int32_t
)
sizeof
(
SMsgSendInfo
));
pTaskInfo
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
pTaskInfo
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_error
;
goto
_error
;
}
}
...
@@ -5198,7 +5194,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
...
@@ -5198,7 +5194,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SRetrieveTableRsp
*
pRsp
=
pExchangeInfo
->
pRsp
;
SRetrieveTableRsp
*
pRsp
=
pExchangeInfo
->
pRsp
;
if
(
pRsp
->
numOfRows
==
0
)
{
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"
QID:0x%"
PRIx64
"
vgId:%d, taskID:0x%"
PRIx64
" %d of total completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
" try next"
,
qDebug
(
"
%s
vgId:%d, taskID:0x%"
PRIx64
" %d of total completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
" try next"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pExchangeInfo
->
current
+
1
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pExchangeInfo
->
current
+
1
,
pExchangeInfo
->
rowsOfCurrentSource
,
pExchangeInfo
->
totalRows
);
pExchangeInfo
->
rowsOfCurrentSource
,
pExchangeInfo
->
totalRows
);
...
@@ -5237,7 +5233,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
...
@@ -5237,7 +5233,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
pExchangeInfo
->
rowsOfCurrentSource
+=
pRsp
->
numOfRows
;
pExchangeInfo
->
rowsOfCurrentSource
+=
pRsp
->
numOfRows
;
if
(
pRsp
->
completed
==
1
)
{
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"
QID:0x%"
PRIx64
"
fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, rowsOfSource:%"
PRIu64
qDebug
(
"
%s
fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
" try next %d/%"
PRIzu
,
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
" try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pExchangeInfo
->
rowsOfCurrentSource
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
bytes
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pExchangeInfo
->
rowsOfCurrentSource
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
bytes
,
pExchangeInfo
->
current
+
1
,
totalSources
);
pExchangeInfo
->
current
+
1
,
totalSources
);
...
@@ -5245,7 +5241,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
...
@@ -5245,7 +5241,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
pExchangeInfo
->
rowsOfCurrentSource
=
0
;
pExchangeInfo
->
rowsOfCurrentSource
=
0
;
pExchangeInfo
->
current
+=
1
;
pExchangeInfo
->
current
+=
1
;
}
else
{
}
else
{
qDebug
(
"
QID:0x%"
PRIx64
"
fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
qDebug
(
"
%s
fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
bytes
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pExchangeInfo
->
totalRows
,
pExchangeInfo
->
bytes
);
}
}
...
@@ -7728,32 +7724,38 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
...
@@ -7728,32 +7724,38 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
)
{
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
)
{
SExecTaskInfo
*
pTaskInfo
=
calloc
(
1
,
sizeof
(
SExecTaskInfo
));
SExecTaskInfo
*
pTaskInfo
=
calloc
(
1
,
sizeof
(
SExecTaskInfo
));
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
taskId
=
taskId
;
char
*
p
=
calloc
(
1
,
128
);
snprintf
(
p
,
128
,
"TID:0x%"
PRIu64
" QID:0x%"
PRIx64
,
taskId
,
queryId
);
pTaskInfo
->
id
.
idstr
=
strdup
(
p
);
return
pTaskInfo
;
return
pTaskInfo
;
}
}
static
tsdbReadHandleT
doCreateDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
void
*
readerHandle
,
uint64_t
queryId
);
static
tsdbReadHandleT
doCreateDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
void
*
readerHandle
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhyNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
void
*
readerHandle
,
uint64_t
queryId
)
{
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhyNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
void
*
readerHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
if
(
pPhyNode
->
pChildren
==
NULL
||
taosArrayGetSize
(
pPhyNode
->
pChildren
)
==
0
)
{
if
(
pPhyNode
->
pChildren
==
NULL
||
taosArrayGetSize
(
pPhyNode
->
pChildren
)
==
0
)
{
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
)
{
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
)
{
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
tsdbReadHandleT
tReaderHandle
=
doCreateDataReadHandle
((
STableScanPhyNode
*
)
pPhyNode
,
readerHandle
,
(
uint64_t
)
queryId
);
tsdbReadHandleT
tReaderHandle
=
doCreateDataReadHandle
((
STableScanPhyNode
*
)
pPhyNode
,
readerHandle
,
(
uint64_t
)
queryId
,
taskId
);
return
createTableScanOperatorInfo
(
tReaderHandle
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pTaskInfo
);
return
createTableScanOperatorInfo
(
tReaderHandle
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pTaskInfo
);
}
else
if
(
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
}
else
if
(
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
tsdbReadHandleT
tReaderHandle
=
doCreateDataReadHandle
((
STableScanPhyNode
*
)
pPhyNode
,
readerHandle
,
(
uint64_t
)
queryId
);
tsdbReadHandleT
tReaderHandle
=
doCreateDataReadHandle
((
STableScanPhyNode
*
)
pPhyNode
,
readerHandle
,
(
uint64_t
)
queryId
,
taskId
);
return
createDataBlocksOptScanInfo
(
tReaderHandle
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
return
createDataBlocksOptScanInfo
(
tReaderHandle
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
}
else
if
(
pPhyNode
->
info
.
type
==
OP_Exchange
)
{
}
else
if
(
pPhyNode
->
info
.
type
==
OP_Exchange
)
{
...
@@ -7771,13 +7773,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
...
@@ -7771,13 +7773,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhyNode
*
pChildNode
=
taosArrayGetP
(
pPhyNode
->
pChildren
,
i
);
SPhyNode
*
pChildNode
=
taosArrayGetP
(
pPhyNode
->
pChildren
,
i
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
readerHandle
,
queryId
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
readerHandle
,
queryId
,
taskId
);
return
createAggregateOperatorInfo
(
op
,
pPhyNode
->
pTargets
,
pTaskInfo
);
return
createAggregateOperatorInfo
(
op
,
pPhyNode
->
pTargets
,
pTaskInfo
);
}
}
}
}
}
}
static
tsdbReadHandleT
createDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
STableGroupInfo
*
pGroupInfo
,
void
*
readerHandle
,
uint64_t
queryId
)
{
static
tsdbReadHandleT
createDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
STableGroupInfo
*
pGroupInfo
,
void
*
readerHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
cond
.
order
=
pTableScanNode
->
scan
.
order
;
cond
.
order
=
pTableScanNode
->
scan
.
order
;
...
@@ -7796,10 +7798,10 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S
...
@@ -7796,10 +7798,10 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S
cond
.
colList
[
i
].
colId
=
pSchema
->
colId
;
cond
.
colList
[
i
].
colId
=
pSchema
->
colId
;
}
}
return
tsdbQueryTables
(
readerHandle
,
&
cond
,
pGroupInfo
,
queryId
,
NULL
);
return
tsdbQueryTables
(
readerHandle
,
&
cond
,
pGroupInfo
,
queryId
,
taskId
);
}
}
static
tsdbReadHandleT
doCreateDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
void
*
readerHandle
,
uint64_t
queryId
)
{
static
tsdbReadHandleT
doCreateDataReadHandle
(
STableScanPhyNode
*
pTableScanNode
,
void
*
readerHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
STableGroupInfo
groupInfo
=
{
0
};
STableGroupInfo
groupInfo
=
{
0
};
...
@@ -7826,28 +7828,28 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
...
@@ -7826,28 +7828,28 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
if
(
groupInfo
.
numOfTables
==
0
)
{
if
(
groupInfo
.
numOfTables
==
0
)
{
code
=
0
;
code
=
0
;
qDebug
(
"no table qualified for query,
reqId:0x%"
PRIx64
,
queryId
);
qDebug
(
"no table qualified for query,
TID:0x%"
PRIx64
", QID:0x%"
PRIx64
,
taskId
,
queryId
);
goto
_error
;
goto
_error
;
}
}
return
createDataReadHandle
(
pTableScanNode
,
&
groupInfo
,
readerHandle
,
queryId
);
return
createDataReadHandle
(
pTableScanNode
,
&
groupInfo
,
readerHandle
,
queryId
,
taskId
);
_error:
_error:
terrno
=
code
;
terrno
=
code
;
return
NULL
;
return
NULL
;
}
}
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
)
{
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
,
uint64_t
taskId
)
{
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
*
pTaskInfo
=
createExecTaskInfo
(
queryId
);
*
pTaskInfo
=
createExecTaskInfo
(
queryId
,
taskId
);
if
(
*
pTaskInfo
==
NULL
)
{
if
(
*
pTaskInfo
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
goto
_complete
;
}
}
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
readerHandle
,
queryId
);
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
readerHandle
,
queryId
,
taskId
);
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
goto
_complete
;
...
@@ -8814,72 +8816,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
...
@@ -8814,72 +8816,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
}
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"
QID:0x%"
PRIx64
" start to free execTask"
,
pTaskInfo
->
id
.
queryId
);
qDebug
(
"
%s start to free execTask"
,
GET_TASKID
(
pTaskInfo
)
);
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
qDebug
(
"
QID:0x%"
PRIx64
" execTask is freed"
,
pTaskInfo
->
id
.
queryId
);
qDebug
(
"
%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
)
);
tfree
(
pTaskInfo
);
tfree
(
pTaskInfo
);
}
}
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
)
{
// the remained number of retrieved rows, not the interpolated result
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STaskAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
// load data from file to msg buffer
if
(
pQueryAttr
->
tsCompQuery
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pRuntimeEnv
->
outputBuf
->
pDataBlock
,
0
);
FILE
*
f
=
*
(
FILE
**
)
pColInfoData
->
pData
;
// TODO refactor
// make sure file exist
if
(
f
)
{
off_t
s
=
lseek
(
fileno
(
f
),
0
,
SEEK_END
);
assert
(
s
==
pRuntimeEnv
->
outputBuf
->
info
.
rows
);
//qDebug("QInfo:0x%"PRIx64" ts comp data return, file:%p, size:%"PRId64, pQInfo->qId, f, (uint64_t)s);
if
(
fseek
(
f
,
0
,
SEEK_SET
)
>=
0
)
{
size_t
sz
=
fread
(
data
,
1
,
s
,
f
);
if
(
sz
<
s
)
{
// todo handle error
//qError("fread(f:%p,%d) failed, rsize:%" PRId64 ", expect size:%" PRId64, f, fileno(f), (uint64_t)sz, (uint64_t)s);
assert
(
0
);
}
}
else
{
UNUSED
(
s
);
//qError("fseek(f:%p,%d) failed, error:%s", f, fileno(f), strerror(errno));
assert
(
0
);
}
// dump error info
if
(
s
<=
(
sizeof
(
STSBufFileHeader
)
+
sizeof
(
STSGroupBlockInfo
)
+
6
*
sizeof
(
int32_t
)))
{
// qDump(data, s);
assert
(
0
);
}
fclose
(
f
);
*
(
FILE
**
)
pColInfoData
->
pData
=
NULL
;
}
// all data returned, set query over
if
(
Q_STATUS_EQUAL
(
pRuntimeEnv
->
status
,
TASK_COMPLETED
))
{
// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
}
}
else
{
doCopyQueryResultToMsg
(
pQInfo
,
(
int32_t
)
pRuntimeEnv
->
outputBuf
->
info
.
rows
,
data
,
compressed
,
compLen
);
}
//qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
// pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
if
(
pQueryAttr
->
limit
.
limit
>
0
&&
pQueryAttr
->
limit
.
limit
==
pRuntimeEnv
->
resultInfo
.
total
)
{
//qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit);
// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
}
return
TSDB_CODE_SUCCESS
;
}
bool
doBuildResCheck
(
SQInfo
*
pQInfo
)
{
bool
doBuildResCheck
(
SQInfo
*
pQInfo
)
{
bool
buildRes
=
false
;
bool
buildRes
=
false
;
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
ac260959
...
@@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
...
@@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo
*
pTaskInfo
=
nullptr
;
SExecTaskInfo
*
pTaskInfo
=
nullptr
;
DataSinkHandle
sinkHandle
=
nullptr
;
DataSinkHandle
sinkHandle
=
nullptr
;
int32_t
code
=
qCreateExecTask
((
void
*
)
1
,
2
,
NULL
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
);
int32_t
code
=
qCreateExecTask
((
void
*
)
1
,
2
,
1
,
NULL
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
);
}
}
#pragma GCC diagnostic pop
#pragma GCC diagnostic pop
\ No newline at end of file
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
ac260959
...
@@ -173,17 +173,17 @@ typedef struct SQWorkerMgmt {
...
@@ -173,17 +173,17 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:
0x
%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
ac260959
...
@@ -974,7 +974,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
...
@@ -974,7 +974,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
}
}
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
tId
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
);
if
(
code
)
{
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%s"
,
tstrerror
(
code
));
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%s"
,
tstrerror
(
code
));
QW_ERR_JRET
(
code
);
QW_ERR_JRET
(
code
);
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
ac260959
...
@@ -1360,9 +1360,14 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
...
@@ -1360,9 +1360,14 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
tDebug
(
"%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
TMSG_INFO
(
pHead
->
msgType
),
tDebug
(
"%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
TMSG_INFO
(
pHead
->
msgType
),
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
pConn
->
peerFqdn
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
else
{
}
else
{
if
(
pHead
->
code
==
0
)
pConn
->
secured
=
1
;
// for success response, set link as secured
if
(
pHead
->
code
==
0
)
{
tDebug
(
"%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
TMSG_INFO
(
pHead
->
msgType
),
pConn
->
secured
=
1
;
// for success response, set link as secured
pConn
->
peerIp
,
pConn
->
peerPort
,
htonl
(
pHead
->
code
),
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
char
ipport
[
40
]
=
{
0
};
taosIpPort2String
(
pConn
->
peerIp
,
pConn
->
peerPort
,
ipport
);
tDebug
(
"%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d"
,
pConn
->
info
,
TMSG_INFO
(
pHead
->
msgType
),
ipport
,
htonl
(
pHead
->
code
),
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
}
// tTrace("connection type is: %d", pConn->connType);
// tTrace("connection type is: %d", pConn->connType);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录