Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
119fcee9
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
119fcee9
编写于
3月 26, 2021
作者:
H
haojun Liao
提交者:
GitHub
3月 26, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5570 from taosdata/feature/qrefactor
[td-225] update log.
上级
211d323a
6fc5e713
变更
22
展开全部
隐藏空白更改
内联
并排
Showing
22 changed file
with
210 addition
and
223 deletion
+210
-223
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-1
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+3
-3
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+2
-2
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+1
-1
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+1
-1
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+2
-2
src/client/src/tscServer.c
src/client/src/tscServer.c
+11
-9
src/client/src/tscSql.c
src/client/src/tscSql.c
+4
-4
src/client/src/tscSub.c
src/client/src/tscSub.c
+3
-3
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+3
-3
src/inc/query.h
src/inc/query.h
+1
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+3
-2
src/inc/tsdb.h
src/inc/tsdb.h
+3
-3
src/query/inc/qResultbuf.h
src/query/inc/qResultbuf.h
+2
-2
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-0
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+63
-62
src/query/src/qPercentile.c
src/query/src/qPercentile.c
+1
-1
src/query/src/qResultbuf.c
src/query/src/qResultbuf.c
+6
-6
src/query/src/qUtil.c
src/query/src/qUtil.c
+4
-4
src/query/src/queryMain.c
src/query/src/queryMain.c
+20
-18
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+57
-57
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+18
-38
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
119fcee9
...
...
@@ -284,7 +284,7 @@ typedef struct {
char
*
pRsp
;
int32_t
rspType
;
int32_t
rspLen
;
uint64_t
q
handle
;
uint64_t
q
Id
;
int64_t
useconds
;
int64_t
offset
;
// offset value from vnode during projection query of stable
int32_t
row
;
...
...
src/client/src/tscAsync.c
浏览文件 @
119fcee9
...
...
@@ -160,8 +160,8 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
((
pRes
->
q
handle
==
0
||
numOfRows
!=
0
)
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
if
(
pRes
->
q
handle
==
0
&&
numOfRows
!=
0
)
{
if
((
pRes
->
q
Id
==
0
||
numOfRows
!=
0
)
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
if
(
pRes
->
q
Id
==
0
&&
numOfRows
!=
0
)
{
tscError
(
"qhandle is NULL"
);
}
else
{
pRes
->
code
=
numOfRows
;
...
...
@@ -208,7 +208,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
pSql
->
fetchFp
=
fp
;
pSql
->
fp
=
tscAsyncFetchRowsProxy
;
if
(
pRes
->
q
handle
==
0
)
{
if
(
pRes
->
q
Id
==
0
)
{
tscError
(
"qhandle is NULL"
);
pRes
->
code
=
TSDB_CODE_TSC_INVALID_QHANDLE
;
pSql
->
param
=
param
;
...
...
src/client/src/tscLocal.c
浏览文件 @
119fcee9
...
...
@@ -309,7 +309,7 @@ TAOS_ROW tscFetchRow(void *param) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
q
handle
==
0
||
if
(
pRes
->
q
Id
==
0
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
return
NULL
;
...
...
@@ -905,7 +905,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
* free allocated resources and remove the SqlObj from sql query linked list
*/
pRes
->
q
handle
=
0x1
;
pRes
->
q
Id
=
0x1
;
pRes
->
numOfRows
=
0
;
}
else
if
(
pCmd
->
command
==
TSDB_SQL_SHOW_CREATE_TABLE
)
{
pRes
->
code
=
tscProcessShowCreateTable
(
pSql
);
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
119fcee9
...
...
@@ -1607,7 +1607,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
tscDestroyLocalMerger
(
pObj
);
}
pRes
->
q
handle
=
1
;
// hack to pass the safety check in fetch_row function
pRes
->
q
Id
=
1
;
// hack to pass the safety check in fetch_row function
pRes
->
numOfRows
=
0
;
pRes
->
row
=
0
;
...
...
src/client/src/tscPrepare.c
浏览文件 @
119fcee9
...
...
@@ -903,7 +903,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pRes
->
q
handle
=
0
;
pRes
->
q
Id
=
0
;
pRes
->
numOfRows
=
1
;
strtolower
(
pSql
->
sqlstr
,
sql
);
...
...
src/client/src/tscProfile.c
浏览文件 @
119fcee9
...
...
@@ -249,8 +249,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pQdesc
->
stime
=
htobe64
(
pSql
->
stime
);
pQdesc
->
queryId
=
htonl
(
pSql
->
queryId
);
//pQdesc->useconds = htobe64(pSql->res.useconds);
pQdesc
->
useconds
=
htobe64
(
now
-
pSql
->
stime
);
pQdesc
->
qHandle
=
htobe64
(
pSql
->
res
.
q
handle
);
pQdesc
->
useconds
=
htobe64
(
now
-
pSql
->
stime
);
// use local time instead of sever rsp elapsed time
pQdesc
->
qHandle
=
htobe64
(
pSql
->
res
.
q
Id
);
pHeartbeat
->
numOfQueries
++
;
pQdesc
++
;
...
...
src/client/src/tscServer.c
浏览文件 @
119fcee9
...
...
@@ -508,7 +508,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
pRetrieveMsg
->
q
handle
=
htobe64
(
pSql
->
res
.
qhandle
);
pRetrieveMsg
->
q
Id
=
htobe64
(
pSql
->
res
.
qId
);
// todo valid the vgroupId at the client side
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -520,7 +520,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
>
0
&&
vgIndex
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%"
PRIX64
,
pSql
,
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
,
vgIndex
,
pSql
->
res
.
q
handle
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%"
PRIX64
,
pSql
,
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
,
vgIndex
,
pSql
->
res
.
q
Id
);
}
else
{
int32_t
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
assert
(
vgIndex
>=
0
&&
vgIndex
<
numOfVgroups
);
...
...
@@ -528,12 +528,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
vgIndex
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableIdList
->
vgInfo
.
vgId
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d, q
handle:%"
PRIX64
,
pSql
,
pTableIdList
->
vgInfo
.
vgId
,
vgIndex
,
pSql
->
res
.
qhandle
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d, q
Id:%"
PRIu64
,
pSql
,
pTableIdList
->
vgInfo
.
vgId
,
vgIndex
,
pSql
->
res
.
qId
);
}
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d, q
handle:%"
PRIX64
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
res
.
qhandle
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d, q
Id:%"
PRIu64
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
res
.
qId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -1583,7 +1583,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pCmd
->
payload
;
pRetrieveMsg
->
q
handle
=
htobe64
(
pSql
->
res
.
qhandle
);
pRetrieveMsg
->
q
Id
=
htobe64
(
pSql
->
res
.
qId
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2134,7 +2134,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pShow
=
(
SShowRsp
*
)
pRes
->
pRsp
;
pShow
->
qhandle
=
htobe64
(
pShow
->
qhandle
);
pRes
->
q
handle
=
pShow
->
qhandle
;
pRes
->
q
Id
=
pShow
->
qhandle
;
tscResetForNextRetrieve
(
pRes
);
pMetaMsg
=
&
(
pShow
->
tableMeta
);
...
...
@@ -2316,11 +2316,12 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryTableRsp
*
pQuery
=
(
SQueryTableRsp
*
)
pRes
->
pRsp
;
pQuery
->
q
handle
=
htobe64
(
pQuery
->
qhandle
);
pRes
->
q
handle
=
pQuery
->
qhandle
;
pQuery
->
q
Id
=
htobe64
(
pQuery
->
qId
);
pRes
->
q
Id
=
pQuery
->
qId
;
pRes
->
data
=
NULL
;
tscResetForNextRetrieve
(
pRes
);
tscDebug
(
"%p query rsp received, qId:%"
PRIu64
,
pSql
,
pRes
->
qId
);
return
0
;
}
...
...
@@ -2378,7 +2379,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
}
pRes
->
row
=
0
;
tscDebug
(
"%p numOfRows:%d, offset:%"
PRId64
", complete:%d"
,
pSql
,
pRes
->
numOfRows
,
pRes
->
offset
,
pRes
->
completed
);
tscDebug
(
"%p numOfRows:%d, offset:%"
PRId64
", complete:%d, qId:%"
PRIu64
,
pSql
,
pRes
->
numOfRows
,
pRes
->
offset
,
pRes
->
completed
,
pRes
->
qId
);
return
0
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
119fcee9
...
...
@@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
q
handle
==
0
||
if
(
pRes
->
q
Id
==
0
||
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
...
...
@@ -508,7 +508,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
q
handle
==
0
||
if
(
pRes
->
q
Id
==
0
||
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
...
...
@@ -554,7 +554,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
==
NULL
||
pRes
->
q
handle
==
0
)
{
if
(
pRes
==
NULL
||
pRes
->
q
Id
==
0
)
{
return
true
;
}
...
...
@@ -1050,7 +1050,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes
->
q
handle
=
0
;
pRes
->
q
Id
=
0
;
free
(
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscSub.c
浏览文件 @
119fcee9
...
...
@@ -149,7 +149,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
}
strtolower
(
pSql
->
sqlstr
,
pSql
->
sqlstr
);
pRes
->
q
handle
=
0
;
pRes
->
q
Id
=
0
;
pRes
->
numOfRows
=
1
;
code
=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
...
...
@@ -448,7 +448,7 @@ SSqlObj* recreateSqlObj(SSub* pSub) {
return
NULL
;
}
pRes
->
q
handle
=
0
;
pRes
->
q
Id
=
0
;
pRes
->
numOfRows
=
1
;
int
code
=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
...
...
@@ -546,7 +546,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
uint32_t
type
=
pQueryInfo
->
type
;
tscFreeSqlResult
(
pSql
);
pRes
->
numOfRows
=
1
;
pRes
->
q
handle
=
0
;
pRes
->
q
Id
=
0
;
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pQueryInfo
->
type
=
type
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
119fcee9
...
...
@@ -1840,7 +1840,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pSql
->
res
.
q
handle
=
0x1
;
pSql
->
res
.
q
Id
=
0x1
;
assert
(
pSql
->
res
.
numOfRows
==
0
);
if
(
pSql
->
pSubs
==
NULL
)
{
...
...
@@ -2440,7 +2440,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SColumnModel
*
pModel
=
NULL
;
SColumnModel
*
pFinalModel
=
NULL
;
pRes
->
q
handle
=
0x1
;
// hack the qhandle check
pRes
->
q
Id
=
0x1
;
// hack the qhandle check
const
uint32_t
nBufferSize
=
(
1u
<<
16u
);
// 64KB
...
...
@@ -2988,7 +2988,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscDebug
(
"%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data"
,
trsupport
->
pParentSql
,
pSql
,
pVgroup
->
epAddr
[
0
].
fqdn
,
pVgroup
->
vgId
,
trsupport
->
subqueryIndex
);
if
(
pSql
->
res
.
q
handle
==
0
)
{
// qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
if
(
pSql
->
res
.
q
Id
==
0
)
{
// qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack
(
param
,
pSql
,
0
);
}
else
{
taos_fetch_rows_a
(
tres
,
tscRetrieveFromDnodeCallBack
,
param
);
...
...
src/inc/query.h
浏览文件 @
119fcee9
...
...
@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs
* @param qinfo
* @return
*/
bool
qTableQuery
(
qinfo_t
qinfo
);
bool
qTableQuery
(
qinfo_t
qinfo
,
uint64_t
*
qId
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
src/inc/taosmsg.h
浏览文件 @
119fcee9
...
...
@@ -514,12 +514,13 @@ typedef struct {
typedef
struct
{
int32_t
code
;
u
int64_t
qhandle
;
// query handle
u
nion
{
uint64_t
qhandle
;
uint64_t
qId
;}
;
// query handle
}
SQueryTableRsp
;
// todo: the show handle should be replaced with id
typedef
struct
{
SMsgHead
header
;
u
int64_t
qhandle
;
u
nion
{
uint64_t
qhandle
;
uint64_t
qId
;};
// query handle
uint16_t
free
;
}
SRetrieveTableMsg
;
...
...
src/inc/tsdb.h
浏览文件 @
119fcee9
...
...
@@ -245,7 +245,7 @@ typedef struct {
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT
*
tsdbQueryTables
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
void
*
qinfo
,
TsdbQueryHandleT
*
tsdbQueryTables
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
SMemRef
*
pRef
);
/**
...
...
@@ -258,7 +258,7 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT
tsdbQueryLastRow
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfo
,
void
*
qinfo
,
TsdbQueryHandleT
tsdbQueryLastRow
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfo
,
uint64_t
qId
,
SMemRef
*
pRef
);
/**
...
...
@@ -277,7 +277,7 @@ SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
* @return
*/
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
,
SMemRef
*
pRef
);
uint64_t
qId
,
SMemRef
*
pRef
);
/**
...
...
src/query/inc/qResultbuf.h
浏览文件 @
119fcee9
...
...
@@ -72,7 +72,7 @@ typedef struct SDiskbasedResultBuf {
bool
comp
;
// compressed before flushed to disk
int32_t
nextPos
;
// next page flush position
const
void
*
handle
;
// for debug purpose
uint64_t
qId
;
// for debug purpose
SResultBufStatis
statis
;
}
SDiskbasedResultBuf
;
...
...
@@ -88,7 +88,7 @@ typedef struct SDiskbasedResultBuf {
* @param handle
* @return
*/
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
const
void
*
handle
);
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
uint64_t
qId
);
/**
*
...
...
src/query/inc/qUtil.h
浏览文件 @
119fcee9
...
...
@@ -25,6 +25,7 @@
} while (0)
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
...
...
src/query/src/qExecutor.c
浏览文件 @
119fcee9
此差异已折叠。
点击以展开。
src/query/src/qPercentile.c
浏览文件 @
119fcee9
...
...
@@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo
(
pBucket
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
NULL
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
1
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tMemBucketDestroy
(
pBucket
);
return
NULL
;
...
...
src/query/src/qResultbuf.c
浏览文件 @
119fcee9
...
...
@@ -9,7 +9,7 @@
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
const
void
*
handle
)
{
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
uint64_t
qId
)
{
*
pResultBuf
=
calloc
(
1
,
sizeof
(
SDiskbasedResultBuf
));
SDiskbasedResultBuf
*
pResBuf
=
*
pResultBuf
;
...
...
@@ -24,7 +24,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
pResBuf
->
allocateId
=
-
1
;
pResBuf
->
comp
=
true
;
pResBuf
->
file
=
NULL
;
pResBuf
->
handle
=
handle
;
pResBuf
->
qId
=
qId
;
pResBuf
->
fileSize
=
0
;
// at least more than 2 pages must be in memory
...
...
@@ -43,7 +43,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
pResBuf
->
emptyDummyIdList
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
qDebug
(
"QInfo:%
p create resBuf for output, page size:%d, inmem buf pages:%d, file:%s"
,
handle
,
pResBuf
->
pageSize
,
qDebug
(
"QInfo:%
"
PRIu64
" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s"
,
qId
,
pResBuf
->
pageSize
,
pResBuf
->
inMemPages
,
pResBuf
->
path
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -406,13 +406,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
}
if
(
pResultBuf
->
file
!=
NULL
)
{
qDebug
(
"QInfo:%
p
res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb"
,
pResultBuf
->
handle
,
pResultBuf
->
totalBufSize
/
1024
.
0
,
listNEles
(
pResultBuf
->
lruList
)
*
pResultBuf
->
pageSize
/
1024
.
0
,
qDebug
(
"QInfo:%
"
PRIu64
"
res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb"
,
pResultBuf
->
qId
,
pResultBuf
->
totalBufSize
/
1024
.
0
,
listNEles
(
pResultBuf
->
lruList
)
*
pResultBuf
->
pageSize
/
1024
.
0
,
pResultBuf
->
fileSize
/
1024
.
0
);
fclose
(
pResultBuf
->
file
);
}
else
{
qDebug
(
"QInfo:%
p res output buffer closed, total:%.2f Kb, no file created"
,
pResultBuf
->
handle
,
qDebug
(
"QInfo:%
"
PRIu64
" res output buffer closed, total:%.2f Kb, no file created"
,
pResultBuf
->
qId
,
pResultBuf
->
totalBufSize
/
1024
.
0
);
}
...
...
src/query/src/qUtil.c
浏览文件 @
119fcee9
...
...
@@ -468,7 +468,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
pTableQueryInfoList
=
malloc
(
POINTER_BYTES
*
size
);
if
(
pTableQueryInfoList
==
NULL
||
posList
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
)
{
qError
(
"QInfo:%
p failed alloc memory"
,
pRuntimeEnv
->
qinfo
);
qError
(
"QInfo:%
"
PRIu64
" failed alloc memory"
,
GET_QID
(
pRuntimeEnv
)
);
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_end
;
}
...
...
@@ -540,7 +540,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
int64_t
endt
=
taosGetTimestampMs
();
qDebug
(
"QInfo:%
p result merge completed for group:%d, elapsed time:%"
PRId64
" ms"
,
pRuntimeEnv
->
qinfo
,
qDebug
(
"QInfo:%
"
PRIu64
" result merge completed for group:%d, elapsed time:%"
PRId64
" ms"
,
GET_QID
(
pRuntimeEnv
)
,
pGroupResInfo
->
currentGroup
,
endt
-
startt
);
_end:
...
...
@@ -567,13 +567,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
break
;
}
qDebug
(
"QInfo:%
p no result in group %d, continue"
,
pRuntimeEnv
->
qinfo
,
pGroupResInfo
->
currentGroup
);
qDebug
(
"QInfo:%
"
PRIu64
" no result in group %d, continue"
,
GET_QID
(
pRuntimeEnv
)
,
pGroupResInfo
->
currentGroup
);
cleanupGroupResInfo
(
pGroupResInfo
);
incNextGroup
(
pGroupResInfo
);
}
int64_t
elapsedTime
=
taosGetTimestampUs
()
-
st
;
qDebug
(
"QInfo:%
p merge res data into group, index:%d, total group:%d, elapsed time:%"
PRId64
"us"
,
pRuntimeEnv
->
qinfo
,
qDebug
(
"QInfo:%
"
PRIu64
" merge res data into group, index:%d, total group:%d, elapsed time:%"
PRId64
"us"
,
GET_QID
(
pRuntimeEnv
)
,
pGroupResInfo
->
currentGroup
,
pGroupResInfo
->
totalGroup
,
elapsedTime
);
// pQInfo->summary.firstStageMergeTime += elapsedTime;
...
...
src/query/src/queryMain.c
浏览文件 @
119fcee9
...
...
@@ -197,29 +197,30 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return
code
;
}
bool
qTableQuery
(
qinfo_t
qinfo
)
{
bool
qTableQuery
(
qinfo_t
qinfo
,
uint64_t
*
qId
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
&&
pQInfo
->
signature
==
pQInfo
);
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pQInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QInfo:%
p qhandle is now executed by thread:%p"
,
pQInfo
,
(
void
*
)
curOwner
);
qError
(
"QInfo:%
"
PRIu64
"-%p qhandle is now executed by thread:%p"
,
pQInfo
->
qId
,
pQInfo
,
(
void
*
)
curOwner
);
pQInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
false
;
}
*
qId
=
pQInfo
->
qId
;
pQInfo
->
startExecTs
=
taosGetTimestampSec
();
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%
p it is already killed, abort"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" it is already killed, abort"
,
pQInfo
->
qId
);
return
doBuildResCheck
(
pQInfo
);
}
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%
p no table exists for query, abort"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" no table exists for query, abort"
,
pQInfo
->
qId
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_COMPLETED
);
return
doBuildResCheck
(
pQInfo
);
}
...
...
@@ -228,22 +229,22 @@ bool qTableQuery(qinfo_t qinfo) {
int32_t
ret
=
setjmp
(
pQInfo
->
runtimeEnv
.
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:%
p query abort due to error/cancel occurs, code:%s"
,
pQInfo
,
tstrerror
(
pQInfo
->
code
));
qDebug
(
"QInfo:%
"
PRIu64
" query abort due to error/cancel occurs, code:%s"
,
pQInfo
->
qId
,
tstrerror
(
pQInfo
->
code
));
return
doBuildResCheck
(
pQInfo
);
}
qDebug
(
"QInfo:%
p query task is launched"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" query task is launched"
,
pQInfo
->
qId
);
pRuntimeEnv
->
outputBuf
=
pRuntimeEnv
->
proot
->
exec
(
pRuntimeEnv
->
proot
);
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%
p query is killed"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" query is killed"
,
pQInfo
->
qId
);
}
else
if
(
GET_NUM_OF_RESULTS
(
pRuntimeEnv
)
==
0
)
{
qDebug
(
"QInfo:%
p over, %u tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
qDebug
(
"QInfo:%
"
PRIu64
" over, %u tables queried, %"
PRId64
" rows are returned"
,
pQInfo
->
qId
,
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
pRuntimeEnv
->
resultInfo
.
total
);
}
else
{
qDebug
(
"QInfo:%
p
query paused, %d rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
pRuntimeEnv
->
resultInfo
.
total
+
GET_NUM_OF_RESULTS
(
pRuntimeEnv
));
qDebug
(
"QInfo:%
"
PRIu64
"
query paused, %d rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
->
qId
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
pRuntimeEnv
->
resultInfo
.
total
+
GET_NUM_OF_RESULTS
(
pRuntimeEnv
));
}
return
doBuildResCheck
(
pQInfo
);
...
...
@@ -253,13 +254,13 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
qError
(
"QInfo:%
p invalid qhandle"
,
pQInfo
);
qError
(
"QInfo:%
"
PRIu64
" invalid qhandle"
,
pQInfo
->
qId
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QInfo:%
p query is killed, code:0x%08x"
,
pQInfo
,
pQInfo
->
code
);
qDebug
(
"QInfo:%
"
PRIu64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
...
...
@@ -279,11 +280,11 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QInfo:%
p retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
,
pQuery
->
resultRowSize
,
qDebug
(
"QInfo:%
"
PRIu64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQuery
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QInfo:%
p retrieve req set query return result after paused"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
...
...
@@ -342,9 +343,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
*
continueExec
=
false
;
(
*
pRsp
)
->
completed
=
1
;
// notify no more result to client
qDebug
(
"QInfo:%"
PRIu64
" no more results to retrieve"
,
pQInfo
->
qId
);
}
else
{
*
continueExec
=
true
;
qDebug
(
"QInfo:%
p has more results to retrieve"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" has more results to retrieve"
,
pQInfo
->
qId
);
}
// the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS
...
...
@@ -397,7 +399,7 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
return
;
}
qDebug
(
"QInfo:%
p query completed"
,
pQInfo
);
qDebug
(
"QInfo:%
"
PRIu64
" query completed"
,
pQInfo
->
qId
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
freeQInfo
(
pQInfo
);
}
...
...
@@ -480,7 +482,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:%
p failed to add qhandle into qMgmt, since qMgmt is closed"
,
(
void
*
)
qInfo
);
qError
(
"QInfo:%
"
PRIu64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -488,7 +490,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QInfo:%
p failed to add qhandle into cache, since qMgmt is colsing"
,
(
void
*
)
qInfo
);
qError
(
"QInfo:%
"
PRIu64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
119fcee9
此差异已折叠。
点击以展开。
src/vnode/src/vnodeRead.c
浏览文件 @
119fcee9
...
...
@@ -25,7 +25,7 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SV
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SVReadMsg
*
pRead
);
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
);
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
uint64_t
qId
,
void
*
qhandle
,
int32_t
vgId
);
int32_t
vnodeInitRead
(
void
)
{
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_QUERY
]
=
vnodeProcessQueryMsg
;
...
...
@@ -167,7 +167,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void
* @param ahandle sqlObj address at client side
* @return
*/
static
int32_t
vnodeDumpQueryResult
(
SRspRet
*
pRet
,
void
*
pVnode
,
void
**
handle
,
bool
*
freeHandle
,
void
*
ahandle
)
{
static
int32_t
vnodeDumpQueryResult
(
SRspRet
*
pRet
,
void
*
pVnode
,
uint64_t
qId
,
void
**
handle
,
bool
*
freeHandle
,
void
*
ahandle
)
{
bool
continueExec
=
false
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -183,7 +183,7 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle,
}
}
else
{
*
freeHandle
=
true
;
vTrace
(
"QInfo:%
p exec completed, free handle:%d"
,
*
handle
,
*
freeHandle
);
vTrace
(
"QInfo:%
"
PRIu64
"-%p exec completed, free handle:%d"
,
qId
,
*
handle
,
*
freeHandle
);
}
}
else
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
...
...
@@ -220,27 +220,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
vError
(
"error rpc msg in query, %s"
,
tstrerror
(
pRead
->
code
));
}
// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont;
//// pMsg->free = htons(killQueryMsg->free);
// pMsg->qhandle = htobe64(pMsg->qhandle);
//
// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle);
//// assert(pRead->contLen > 0 && pMsg->free == 1);
//
// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle);
// if (qhandle == NULL || *qhandle == NULL) {
// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle);
// } else {
// assert(*qhandle == (void *)pMsg->qhandle);
//
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
// }
//
// return TSDB_CODE_TSC_QUERY_CANCELLED;
// }
int32_t
code
=
TSDB_CODE_SUCCESS
;
void
**
handle
=
NULL
;
...
...
@@ -274,7 +253,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if
(
handle
!=
NULL
&&
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
qId
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%"
PRIu64
"-%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
qId
,
*
handle
,
pRead
->
rpcHandle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
...
...
@@ -297,16 +276,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
else
{
assert
(
pCont
!=
NULL
);
void
**
qhandle
=
(
void
**
)
pRead
->
qhandle
;
uint64_t
qId
=
0
;
vTrace
(
"vgId:%d, QInfo:%p, dnode continues to exec query"
,
pVnode
->
vgId
,
*
qhandle
);
// In the retrieve blocking model, only 50% CPU will be used in query processing
if
(
tsRetrieveBlockingModel
)
{
qTableQuery
(
*
qhandle
);
// do execute query
qTableQuery
(
*
qhandle
,
&
qId
);
// do execute query
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
qhandle
,
false
);
}
else
{
bool
freehandle
=
false
;
bool
buildRes
=
qTableQuery
(
*
qhandle
);
// do execute query
bool
buildRes
=
qTableQuery
(
*
qhandle
,
&
qId
);
// do execute query
// build query rsp, the retrieve request has reached here already
if
(
buildRes
)
{
...
...
@@ -318,7 +298,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRead
->
rpcHandle
);
// set the real rsp error code
pRead
->
code
=
vnodeDumpQueryResult
(
&
pRead
->
rspRet
,
pVnode
,
qhandle
,
&
freehandle
,
pRead
->
rpcHandle
);
pRead
->
code
=
vnodeDumpQueryResult
(
&
pRead
->
rspRet
,
pVnode
,
q
Id
,
q
handle
,
&
freehandle
,
pRead
->
rpcHandle
);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code
=
TSDB_CODE_QRY_HAS_RSP
;
...
...
@@ -348,32 +328,32 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
SRetrieveTableMsg
*
pRetrieve
=
pCont
;
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
pRetrieve
->
q
handle
=
htobe64
(
pRetrieve
->
qhandle
);
pRetrieve
->
q
Id
=
htobe64
(
pRetrieve
->
qId
);
vTrace
(
"vgId:%d,
QInfo:%"
PRIu64
", retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
pRetrieve
->
qhandle
,
vTrace
(
"vgId:%d,
qId:%"
PRIu64
", retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
pRetrieve
->
qId
,
pRetrieve
->
free
,
pRead
->
rpcHandle
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
terrno
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
void
**
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
pRetrieve
->
q
handle
);
void
**
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
pRetrieve
->
q
Id
);
if
(
handle
==
NULL
)
{
code
=
terrno
;
terrno
=
TSDB_CODE_SUCCESS
;
}
else
if
(
!
checkQIdEqual
(
*
handle
,
pRetrieve
->
q
handle
))
{
}
else
if
(
!
checkQIdEqual
(
*
handle
,
pRetrieve
->
q
Id
))
{
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, invalid
handle in retrieving result, code:%s, QInfo:%"
PRIu64
,
pVnode
->
vgId
,
tstrerror
(
code
),
pRetrieve
->
qhandle
);
vError
(
"vgId:%d, invalid
qId in retrieving result, code:%s, QInfo:%"
PRIu64
,
pVnode
->
vgId
,
tstrerror
(
code
),
pRetrieve
->
qId
);
vnodeBuildNoResultQueryRsp
(
pRet
);
return
code
;
}
// kill current query and free corresponding resources.
if
(
pRetrieve
->
free
==
1
)
{
vWarn
(
"vgId:%d, QInfo:%"
PRIu64
"-%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pRetrieve
->
q
handle
,
*
handle
);
vWarn
(
"vgId:%d, QInfo:%"
PRIu64
"-%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pRetrieve
->
q
Id
,
*
handle
);
qKillQuery
(
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
...
...
@@ -383,7 +363,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
// register the qhandle to connect to quit query immediate if connection is broken
if
(
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
pRetrieve
->
qId
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%"
PRIu64
"-%p, retrieve discarded since link is broken, %p"
,
pVnode
->
vgId
,
pRetrieve
->
qhandle
,
*
handle
,
pRead
->
rpcHandle
);
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
qKillQuery
(
*
handle
);
...
...
@@ -413,7 +393,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
// ahandle is the sqlObj pointer
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
handle
,
&
freeHandle
,
pRead
->
rpcHandle
);
code
=
vnodeDumpQueryResult
(
pRet
,
pVnode
,
pRetrieve
->
qId
,
handle
,
&
freeHandle
,
pRead
->
rpcHandle
);
}
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
...
...
@@ -427,13 +407,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
)
{
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
uint64_t
qId
,
void
*
qhandle
,
int32_t
vgId
)
{
SRetrieveTableMsg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
pMsg
->
qhandle
=
htobe64
((
uint64_t
)
qhandle
);
pMsg
->
header
.
vgId
=
htonl
(
vgId
);
pMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
vTrace
(
"QInfo:%
p register qhandle to connect:%p"
,
qhandle
,
handle
);
vTrace
(
"QInfo:%
"
PRIu64
"-%p register qhandle to connect:%p"
,
qId
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
pMsg
,
sizeof
(
SRetrieveTableMsg
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录