Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
de7e24b4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
de7e24b4
编写于
1月 25, 2022
作者:
H
Haojun Liao
提交者:
GitHub
1月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10006 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
上级
65b07fd1
550d276d
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
87 addition
and
326 deletion
+87
-326
2.0/src/query/src/queryMain.c
2.0/src/query/src/queryMain.c
+2
-2
include/os/os.h
include/os/os.h
+1
-0
include/os/osDef.h
include/os/osDef.h
+2
-1
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+2
-0
source/dnode/vnode/src/vnd/vnodeMgr.c
source/dnode/vnode/src/vnd/vnodeMgr.c
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-9
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+9
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+30
-30
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+33
-280
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+0
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+1
-1
未找到文件。
2.0/src/query/src/queryMain.c
浏览文件 @
de7e24b4
...
...
@@ -461,7 +461,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
}
qDebug
(
"QInfo:0x%"
PRIx64
" query killed"
,
pQInfo
->
qId
);
set
Query
Killed
(
pQInfo
);
set
Task
Killed
(
pQInfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
...
...
@@ -634,7 +634,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qWarn
(
"QId:0x%"
PRIx64
" be killed(no memory commit)."
,
pQInfo
->
qId
);
set
Query
Killed
(
pQInfo
);
set
Task
Killed
(
pQInfo
);
// wait query stop
int32_t
loop
=
0
;
...
...
include/os/os.h
浏览文件 @
de7e24b4
...
...
@@ -51,6 +51,7 @@ extern "C" {
#include <libgen.h>
#include <sys/mman.h>
#include <sys/prctl.h>
#include "osAtomic.h"
#include "osDef.h"
...
...
include/os/osDef.h
浏览文件 @
de7e24b4
...
...
@@ -181,7 +181,8 @@ extern "C" {
#endif
#else
// Windows
#define setThreadName(name)
// #define setThreadName(name)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#if defined(_WIN32)
...
...
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
de7e24b4
...
...
@@ -487,6 +487,8 @@ static void *dnodeThreadRoutine(void *param) {
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
int32_t
ms
=
pDnode
->
cfg
.
statusInterval
*
1000
;
setThreadName
(
"dnode-hb"
);
while
(
true
)
{
pthread_testcancel
();
taosMsleep
(
ms
);
...
...
source/dnode/vnode/src/vnd/vnodeMgr.c
浏览文件 @
de7e24b4
...
...
@@ -98,6 +98,8 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
/* ------------------------ STATIC METHODS ------------------------ */
static
void
*
loop
(
void
*
arg
)
{
setThreadName
(
"vnode-commit"
);
SVnodeTask
*
pTask
;
for
(;;)
{
pthread_mutex_lock
(
&
(
vnodeMgr
.
mutex
));
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
de7e24b4
...
...
@@ -250,9 +250,8 @@ typedef struct SExecTaskInfo {
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
uint64_t
totalRows
;
// total number of rows
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
char
*
sql
;
// query sql string
jmp_buf
env
;
//
struct
SOperatorInfo
*
pRoot
;
...
...
@@ -622,8 +621,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t nu
int32_t
createQueryFilter
(
char
*
data
,
uint16_t
len
,
SFilterInfo
**
pFilters
);
SGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableReq
*
pQueryMsg
,
SColIndex
*
pColIndex
,
int32_t
*
code
);
SQInfo
*
createQInfoImpl
(
SQueryTableReq
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
);
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
STaskParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
void
*
merger
);
...
...
@@ -645,20 +642,18 @@ void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
//void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool
isValidQInfo
(
void
*
param
);
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
);
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
);
void
set
QueryKilled
(
SQInfo
*
pQ
Info
);
void
set
TaskKilled
(
SExecTaskInfo
*
pTask
Info
);
void
publishOperatorProfEvent
(
SOperatorInfo
*
operatorInfo
,
EQueryProfEventType
eventType
);
void
publishQueryAbortEvent
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
code
);
void
calculateOperatorProfResults
(
SQInfo
*
pQInfo
);
void
queryCostStatis
(
S
QInfo
*
pQ
Info
);
void
queryCostStatis
(
S
ExecTaskInfo
*
pTask
Info
);
void
doDestroyTask
(
S
QInfo
*
pQ
Info
);
void
doDestroyTask
(
S
ExecTaskInfo
*
pTask
Info
);
void
freeQueryAttr
(
STaskAttr
*
pQuery
);
int32_t
getMaximumIdleDurationSec
();
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
de7e24b4
...
...
@@ -121,11 +121,19 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
}
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
if
(
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
>=
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
)
{
uint32_t
capacity
=
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
;
if
(
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
>
capacity
)
{
qError
(
"SinkNode queue is full, no capacity, max:%d, current:%d, no capacity"
,
capacity
,
taosQueueSize
(
pDispatcher
->
pDataBlocks
));
return
false
;
}
pBuf
->
allocSize
=
DATA_META_LENGTH
(
pInput
->
pTableRetrieveTsMap
)
+
pDispatcher
->
schema
.
resultRowSize
*
pInput
->
pData
->
info
.
rows
;
pBuf
->
pData
=
malloc
(
pBuf
->
allocSize
);
if
(
pBuf
->
pData
==
NULL
)
{
qError
(
"SinkNode failed to malloc memory, size:%d, code:%d"
,
pBuf
->
allocSize
,
TAOS_SYSTEM_ERROR
(
errno
));
}
return
NULL
!=
pBuf
->
pData
;
}
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
de7e24b4
...
...
@@ -149,7 +149,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
qError
(
"QI
D
:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
...
...
@@ -160,7 +160,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
}
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"QI
nfo
:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"QI
D
:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -169,12 +169,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
qDebug
(
"QI
D
:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
}
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"QI
D
:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
...
...
@@ -190,8 +190,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
qDebug
(
"QInfo:0x%"
PRIx64
" query paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
0
,
0L
,
0
);
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
pTaskInfo
->
totalRows
+=
current
;
qDebug
(
"QID:0x%"
PRIx64
" task paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
current
,
pTaskInfo
->
totalRows
,
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
...
...
@@ -200,14 +203,14 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
pQInfo
==
NULL
)
{
qError
(
"QInfo invalid qhandle"
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
qDebug
(
"QI
D
:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
...
...
@@ -227,11 +230,11 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
qDebug
(
"QI
D
:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
qDebug
(
"QI
D
:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
...
...
@@ -251,18 +254,18 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
}
int32_t
qKillTask
(
qTaskInfo_t
qinfo
)
{
S
QInfo
*
pQInfo
=
(
SQ
Info
*
)
qinfo
;
S
ExecTaskInfo
*
pTaskInfo
=
(
SExecTask
Info
*
)
qinfo
;
if
(
p
QInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
p
TaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QI
nfo:0x%"
PRIx64
" query killed"
,
pQInfo
->
q
Id
);
set
QueryKilled
(
pQ
Info
);
qDebug
(
"QI
D:0x%"
PRIx64
" execTask killed"
,
pTaskInfo
->
id
.
query
Id
);
set
TaskKilled
(
pTask
Info
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
p
Q
Info
->
owner
!=
0
)
{
while
(
p
Task
Info
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
...
...
@@ -270,14 +273,14 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
}
int32_t
qAsyncKillTask
(
qTaskInfo_t
qinfo
)
{
S
QInfo
*
pQInfo
=
(
SQ
Info
*
)
qinfo
;
S
ExecTaskInfo
*
pTaskInfo
=
(
SExecTask
Info
*
)
qinfo
;
if
(
p
QInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
p
TaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QI
nfo:0x%"
PRIx64
" query async killed"
,
pQInfo
->
q
Id
);
set
QueryKilled
(
pQ
Info
);
qDebug
(
"QI
D:0x%"
PRIx64
" query async killed"
,
pTaskInfo
->
id
.
query
Id
);
set
TaskKilled
(
pTask
Info
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -292,15 +295,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return
isTaskKilled
(
pTaskInfo
)
||
Q_STATUS_EQUAL
(
pTaskInfo
->
status
,
TASK_OVER
);
}
void
qDestroyTask
(
qTaskInfo_t
qHandle
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qHandle
;
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"QID:0x%"
PRIx64
" execTask completed, numOfRows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pTaskInfo
->
totalRows
);
qDebug
(
"QInfo:0x%"
PRIx64
" query completed"
,
pQInfo
->
qId
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
doDestroyTask
(
pQInfo
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
void
*
qOpenTaskMgmt
(
int32_t
vgId
)
{
...
...
@@ -381,7 +381,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QI
D
:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -389,7 +389,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QI
D
:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
...
...
@@ -445,7 +445,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
set
Query
Killed(pQInfo);
set
Task
Killed(pQInfo);
// wait query stop
int32_t loop = 0;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
de7e24b4
...
...
@@ -2432,9 +2432,9 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
if
(
IS_QUERY_KILLED
(
pTaskInfo
))
{
return
true
;
}
//
if (IS_QUERY_KILLED(pTaskInfo)) {
//
return true;
//
}
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
...
...
@@ -2444,13 +2444,13 @@ bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
assert
(
pTaskInfo
->
cost
.
start
!=
0
);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
return
true
;
//
return true;
}
return
false
;
}
void
set
QueryKilled
(
SQInfo
*
pQInfo
)
{
pQ
Info
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
void
set
TaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
pTask
Info
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
//static bool isFixedOutputQuery(STaskAttr* pQueryAttr) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
...
...
@@ -4420,33 +4420,32 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
taosArrayDestroy
(
opStack
);
}
void
queryCostStatis
(
SQInfo
*
pQInfo
)
{
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STaskCostInfo
*
pSummary
=
&
pQInfo
->
summary
;
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
uint64_t
hashSize
=
taosHashGetMemSize
(
pQInfo
->
runtimeEnv
.
pResultRowHashTable
);
hashSize
+=
taosHashGetMemSize
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
map
);
pSummary
->
hashSize
=
hashSize
;
//
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
//
hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
//
pSummary->hashSize = hashSize;
// add the merge time
pSummary
->
elapsedTime
+=
pSummary
->
firstStageMergeTime
;
SResultRowPool
*
p
=
pQInfo
->
runtimeEnv
.
pool
;
if
(
p
!=
NULL
)
{
pSummary
->
winInfoSize
=
getResultRowPoolMemSize
(
p
);
pSummary
->
numOfTimeWindows
=
getNumOfAllocatedResultRows
(
p
);
}
else
{
pSummary
->
winInfoSize
=
0
;
pSummary
->
numOfTimeWindows
=
0
;
}
calculateOperatorProfResults
(
pQInfo
);
//qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
// "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
// pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
// pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
// } else {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
//
// calculateOperatorProfResults(pQInfo);
qDebug
(
"QID:0x%"
PRIx64
" :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
//
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...
...
@@ -7733,7 +7732,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
SExecTaskInfo
*
pTaskInfo
=
calloc
(
1
,
sizeof
(
SExecTaskInfo
));
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pthread_mutex_init
(
&
pTaskInfo
->
lock
,
NULL
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
return
pTaskInfo
;
...
...
@@ -8673,229 +8671,6 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return
((
SQInfo
*
)
qHandle
)
->
qId
==
qId
;
}
SQInfo
*
createQInfoImpl
(
SQueryTableReq
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
)
{
int16_t
numOfCols
=
pQueryMsg
->
numOfCols
;
int16_t
numOfOutput
=
pQueryMsg
->
numOfOutput
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
goto
_cleanup_qinfo
;
}
pQInfo
->
qId
=
qId
;
pQInfo
->
startExecTs
=
0
;
pQInfo
->
runtimeEnv
.
pUdfInfo
=
pUdfInfo
;
// to make sure third party won't overwrite this structure
pQInfo
->
signature
=
pQInfo
;
STaskAttr
*
pQueryAttr
=
&
pQInfo
->
query
;
pQInfo
->
runtimeEnv
.
pQueryAttr
=
pQueryAttr
;
pQueryAttr
->
tableGroupInfo
=
*
pTableGroupInfo
;
pQueryAttr
->
numOfCols
=
numOfCols
;
pQueryAttr
->
numOfOutput
=
numOfOutput
;
pQueryAttr
->
limit
.
limit
=
pQueryMsg
->
limit
;
pQueryAttr
->
limit
.
offset
=
pQueryMsg
->
offset
;
pQueryAttr
->
order
.
order
=
pQueryMsg
->
order
;
pQueryAttr
->
order
.
col
.
info
.
colId
=
pQueryMsg
->
orderColId
;
pQueryAttr
->
pExpr1
=
pExprs
;
pQueryAttr
->
pExpr2
=
pSecExprs
;
pQueryAttr
->
numOfExpr2
=
pQueryMsg
->
secondStageOutput
;
pQueryAttr
->
pGroupbyExpr
=
pGroupbyExpr
;
memcpy
(
&
pQueryAttr
->
interval
,
&
pQueryMsg
->
interval
,
sizeof
(
pQueryAttr
->
interval
));
pQueryAttr
->
fillType
=
pQueryMsg
->
fillType
;
pQueryAttr
->
numOfTags
=
pQueryMsg
->
numOfTags
;
pQueryAttr
->
tagColList
=
pTagCols
;
pQueryAttr
->
prjInfo
.
vgroupLimit
=
pQueryMsg
->
vgroupLimit
;
pQueryAttr
->
prjInfo
.
ts
=
(
pQueryMsg
->
order
==
TSDB_ORDER_ASC
)
?
INT64_MIN
:
INT64_MAX
;
// pQueryAttr->sw = pQueryMsg->sw;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
stableQuery
=
pQueryMsg
->
stableQuery
;
pQueryAttr
->
topBotQuery
=
pQueryMsg
->
topBotQuery
;
pQueryAttr
->
groupbyColumn
=
pQueryMsg
->
groupbyColumn
;
pQueryAttr
->
hasTagResults
=
pQueryMsg
->
hasTagResults
;
pQueryAttr
->
timeWindowInterpo
=
pQueryMsg
->
timeWindowInterpo
;
pQueryAttr
->
queryBlockDist
=
pQueryMsg
->
queryBlockDist
;
pQueryAttr
->
stabledev
=
pQueryMsg
->
stabledev
;
pQueryAttr
->
tsCompQuery
=
pQueryMsg
->
tsCompQuery
;
pQueryAttr
->
simpleAgg
=
pQueryMsg
->
simpleAgg
;
pQueryAttr
->
pointInterpQuery
=
pQueryMsg
->
pointInterpQuery
;
pQueryAttr
->
needReverseScan
=
pQueryMsg
->
needReverseScan
;
pQueryAttr
->
stateWindow
=
pQueryMsg
->
stateWindow
;
pQueryAttr
->
vgId
=
vgId
;
// pQueryAttr->pFilters = pFilters;
pQueryAttr
->
tableCols
=
calloc
(
numOfCols
,
sizeof
(
SSingleColumnFilterInfo
));
if
(
pQueryAttr
->
tableCols
==
NULL
)
{
goto
_cleanup
;
}
pQueryAttr
->
srcRowSize
=
0
;
pQueryAttr
->
maxTableColumnWidth
=
0
;
for
(
int16_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pQueryAttr
->
tableCols
[
i
]
=
pQueryMsg
->
tableCols
[
i
];
// pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters);
pQueryAttr
->
srcRowSize
+=
pQueryAttr
->
tableCols
[
i
].
bytes
;
if
(
pQueryAttr
->
maxTableColumnWidth
<
pQueryAttr
->
tableCols
[
i
].
bytes
)
{
pQueryAttr
->
maxTableColumnWidth
=
pQueryAttr
->
tableCols
[
i
].
bytes
;
}
}
for
(
int16_t
col
=
0
;
col
<
numOfOutput
;
++
col
)
{
assert
(
pExprs
[
col
].
base
.
resSchema
.
bytes
>
0
);
pQueryAttr
->
resultRowSize
+=
pExprs
[
col
].
base
.
resSchema
.
bytes
;
// keep the tag length
if
(
TSDB_COL_IS_TAG
(
pExprs
[
col
].
base
.
pColumns
->
flag
))
{
pQueryAttr
->
tagLen
+=
pExprs
[
col
].
base
.
resSchema
.
bytes
;
}
// if (pExprs[col].base.flist.filterInfo) {
// ++pQueryAttr->havingNum;
// }
}
doUpdateExprColumnIndex
(
pQueryAttr
);
if
(
pSecExprs
!=
NULL
)
{
int32_t
resultRowSize
=
0
;
// calculate the result row size
for
(
int16_t
col
=
0
;
col
<
pQueryAttr
->
numOfExpr2
;
++
col
)
{
assert
(
pSecExprs
[
col
].
base
.
resSchema
.
bytes
>
0
);
resultRowSize
+=
pSecExprs
[
col
].
base
.
resSchema
.
bytes
;
}
if
(
resultRowSize
>
pQueryAttr
->
resultRowSize
)
{
pQueryAttr
->
resultRowSize
=
resultRowSize
;
}
}
if
(
pQueryAttr
->
fillType
!=
TSDB_FILL_NONE
)
{
pQueryAttr
->
fillVal
=
malloc
(
sizeof
(
int64_t
)
*
pQueryAttr
->
numOfOutput
);
if
(
pQueryAttr
->
fillVal
==
NULL
)
{
goto
_cleanup
;
}
// the first column is the timestamp
memcpy
(
pQueryAttr
->
fillVal
,
(
char
*
)
pQueryMsg
->
fillVal
,
pQueryAttr
->
numOfOutput
*
sizeof
(
int64_t
));
}
size_t
numOfGroups
=
0
;
if
(
pTableGroupInfo
->
pGroupList
!=
NULL
)
{
numOfGroups
=
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
STableGroupInfo
*
pTableqinfo
=
&
pQInfo
->
runtimeEnv
.
tableqinfoGroupInfo
;
pTableqinfo
->
pGroupList
=
taosArrayInit
(
numOfGroups
,
POINTER_BYTES
);
pTableqinfo
->
numOfTables
=
pTableGroupInfo
->
numOfTables
;
pTableqinfo
->
map
=
taosHashInit
(
pTableGroupInfo
->
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
}
pQInfo
->
pBuf
=
calloc
(
pTableGroupInfo
->
numOfTables
,
sizeof
(
STableQueryInfo
));
if
(
pQInfo
->
pBuf
==
NULL
)
{
goto
_cleanup
;
}
pQInfo
->
dataReady
=
QUERY_RESULT_NOT_READY
;
pQInfo
->
rspContext
=
NULL
;
pQInfo
->
sql
=
sql
;
pthread_mutex_init
(
&
pQInfo
->
lock
,
NULL
);
tsem_init
(
&
pQInfo
->
ready
,
0
,
0
);
pQueryAttr
->
window
=
pQueryMsg
->
window
;
updateDataCheckOrder
(
pQInfo
,
pQueryMsg
,
pQueryAttr
->
stableQuery
);
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STimeWindow
window
=
pQueryAttr
->
window
;
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
pa
=
taosArrayGetP
(
pQueryAttr
->
tableGroupInfo
.
pGroupList
,
i
);
size_t
s
=
taosArrayGetSize
(
pa
);
SArray
*
p1
=
taosArrayInit
(
s
,
POINTER_BYTES
);
if
(
p1
==
NULL
)
{
goto
_cleanup
;
}
taosArrayPush
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
pGroupList
,
&
p1
);
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
// STableKeyInfo* info = taosArrayGet(pa, j);
// window.skey = info->lastKey;
//
// void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
// STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf);
// if (item == NULL) {
// goto _cleanup;
// }
//
// item->groupIndex = i;
// taosArrayPush(p1, &item);
// STableId* id = TSDB_TABLEID(info->pTable);
// taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
// index += 1;
}
}
colIdCheck
(
pQueryAttr
,
pQInfo
->
qId
);
// int32_t functionId = getExprFunctionId(&pExpr[0]);
// pQInfo->query.queryBlockDist = (functionId == FUNCTION_BLKINFO);
//qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo);
return
pQInfo
;
_cleanup_qinfo:
// tsdbDestroyTableGroup(pTableGroupInfo);
if
(
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pGroupbyExpr
->
columnInfo
);
free
(
pGroupbyExpr
);
}
tfree
(
pTagCols
);
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExprs
[
i
];
if
(
pExprInfo
->
pExpr
!=
NULL
)
{
tExprTreeDestroy
(
pExprInfo
->
pExpr
,
NULL
);
pExprInfo
->
pExpr
=
NULL
;
}
// if (pExprInfo->base.flist.filterInfo) {
// freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters);
// }
}
tfree
(
pExprs
);
// filterFreeInfo(pFilters);
_cleanup:
doDestroyTask
(
pQInfo
);
return
NULL
;
}
bool
isValidQInfo
(
void
*
param
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
param
;
if
(
pQInfo
==
NULL
)
{
return
false
;
}
/*
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t
sig
=
(
uint64_t
)
pQInfo
->
signature
;
return
(
sig
==
(
uint64_t
)
pQInfo
);
}
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
STaskParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
void
*
merger
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -8957,7 +8732,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
_error:
// table query ref will be decrease during error handling
doDestroyTask
(
pQInfo
);
//
doDestroyTask(pQInfo);
return
code
;
}
...
...
@@ -9038,36 +8813,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
return
NULL
;
}
void
doDestroyTask
(
SQInfo
*
pQInfo
)
{
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
//qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId);
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
releaseQueryBuf
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
doDestroyTableQueryInfo
(
&
pRuntimeEnv
->
tableqinfoGroupInfo
);
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
STaskAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
freeQueryAttr
(
pQueryAttr
);
// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
tfree
(
pQInfo
->
pBuf
);
tfree
(
pQInfo
->
sql
);
taosArrayDestroy
(
pQInfo
->
summary
.
queryProfEvents
);
taosHashCleanup
(
pQInfo
->
summary
.
operatorProfResults
);
taosArrayDestroy
(
pRuntimeEnv
->
groupResInfo
.
pRows
);
pQInfo
->
signature
=
0
;
//qDebug("QInfo:0x%"PRIx64" QInfo is freed", pQInfo->qId);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"QID:0x%"
PRIx64
" start to free execTask"
,
pTaskInfo
->
id
.
queryId
);
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree
(
pQInfo
);
qDebug
(
"QID:0x%"
PRIx64
" execTask is freed"
,
pTaskInfo
->
id
.
queryId
);
tfree
(
pTaskInfo
);
}
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
)
{
...
...
source/libs/planner/src/logicPlan.c
浏览文件 @
de7e24b4
...
...
@@ -413,7 +413,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree
(
pQueryTableInfo
->
tableName
);
}
printf
(
"----------->Free:%p
\n
"
,
pQueryNode
->
pExpr
);
taosArrayDestroy
(
pQueryNode
->
pExpr
);
tfree
(
pQueryNode
->
pExtInfo
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
de7e24b4
...
...
@@ -492,7 +492,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SInputData
inputData
=
{.
pData
=
pRes
,
.
pTableRetrieveTsMap
=
NULL
};
code
=
dsPutDataBlock
(
sinkHandle
,
&
inputData
,
&
qcontinue
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%
x"
,
code
);
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%
s"
,
tstrerror
(
code
)
);
QW_ERR_JRET
(
code
);
}
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
de7e24b4
...
...
@@ -242,7 +242,7 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc
=
(
SRpcInfo
*
)
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
return
NULL
;
if
(
pInit
->
label
)
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strl
en
(
pInit
->
label
));
if
(
pInit
->
label
)
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
tListL
en
(
pInit
->
label
));
pRpc
->
connType
=
pInit
->
connType
;
if
(
pRpc
->
connType
==
TAOS_CONN_CLIENT
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录