Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
怡红公子⭕
TDengine
提交
52951dc9
T
TDengine
项目概览
怡红公子⭕
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
5
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
52951dc9
编写于
7月 03, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225] fix error in qinfo life-cycle management.
上级
1b4fa8da
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
109 addition
and
75 deletion
+109
-75
src/inc/query.h
src/inc/query.h
+1
-1
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+2
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+68
-38
src/query/src/qast.c
src/query/src/qast.c
+1
-3
src/util/src/tcache.c
src/util/src/tcache.c
+0
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+3
-2
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+34
-29
未找到文件。
src/inc/query.h
浏览文件 @
52951dc9
...
...
@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo
* @return
*/
void
qTableQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
);
void
qTableQuery
(
qinfo_t
qinfo
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
src/query/inc/qExecutor.h
浏览文件 @
52951dc9
...
...
@@ -197,7 +197,8 @@ typedef struct SQInfo {
*/
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
_qinfo_free_fn_t
fn
;
_qinfo_free_fn_t
freeFn
;
jmp_buf
env
;
}
SQInfo
;
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/src/qExecutor.c
浏览文件 @
52951dc9
...
...
@@ -2586,7 +2586,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
int64_t
getNumOfResultWindowRes
(
SQuery
*
pQuery
,
SWindowResult
*
pWindowRes
)
{
// int64_t maxOutput = 0;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
base
.
functionId
;
...
...
@@ -2604,15 +2603,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
if
(
pResultInfo
->
numOfRes
>
0
)
{
return
pResultInfo
->
numOfRes
;
}
// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
// maxOutput = pResultInfo->numOfRes;
//
// if (maxOutput > 0) {
// break;
// }
// }
//
// assert(pResultInfo != NULL);
}
return
0
;
...
...
@@ -2623,12 +2613,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
size_t
size
=
taosArrayGetSize
(
pGroup
);
tFilePage
**
buffer
=
pQuery
->
sdata
;
int32_t
*
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
int32_t
*
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
STableQueryInfo
**
pTableList
=
malloc
(
POINTER_BYTES
*
size
);
if
(
pTableList
==
NULL
||
posList
==
NULL
)
{
tfree
(
posList
);
tfree
(
pTableList
);
qError
(
"QInfo:%p failed alloc memory"
,
pQInfo
);
longjmp
(
pQInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// todo opt for the case of one table per group
int32_t
numOfTables
=
0
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
...
...
@@ -4069,7 +4066,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return
pFillCol
;
}
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
STSBuf
*
pTsBuf
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
,
void
*
freeParam
,
_qinfo_free_fn_t
fn
)
{
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
STSBuf
*
pTsBuf
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
@@ -4083,8 +4080,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
pQInfo
->
param
=
freeParam
;
pQInfo
->
fn
=
fn
;
pRuntimeEnv
->
pQuery
=
pQuery
;
pRuntimeEnv
->
pTSBuf
=
pTsBuf
;
...
...
@@ -4932,14 +4927,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pRuntimeEnv
->
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
assert
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
/* check if query is killed or not */
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
{
qDebug
(
"QInfo:%p query paused, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
}
}
static
void
stableQueryImpl
(
SQInfo
*
pQInfo
)
{
...
...
@@ -4961,10 +4948,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pQInfo
->
runtimeEnv
.
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
if
(
pQuery
->
rec
.
rows
==
0
)
{
qDebug
(
"QInfo:%p over, %zu tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
}
}
static
int32_t
getColumnIndexInSource
(
SQueryTableMsg
*
pQueryMsg
,
SSqlFuncMsg
*
pExprMsg
,
SColumnInfo
*
pTagCols
)
{
...
...
@@ -5076,6 +5059,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*/
static
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
SArray
**
pTableIdList
,
SSqlFuncMsg
***
pExpr
,
char
**
tagCond
,
char
**
tbnameCond
,
SColIndex
**
groupbyCols
,
SColumnInfo
**
tagCols
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pQueryMsg
->
numOfTables
=
htonl
(
pQueryMsg
->
numOfTables
);
pQueryMsg
->
window
.
skey
=
htobe64
(
pQueryMsg
->
window
.
skey
);
...
...
@@ -5102,7 +5087,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// query msg safety check
if
(
!
validateQueryMsg
(
pQueryMsg
))
{
return
TSDB_CODE_QRY_INVALID_MSG
;
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
char
*
pMsg
=
(
char
*
)(
pQueryMsg
->
colList
)
+
sizeof
(
SColumnInfo
)
*
pQueryMsg
->
numOfCols
;
...
...
@@ -5174,7 +5160,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int16_t
functionId
=
pExprMsg
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
||
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
if
(
pExprMsg
->
colInfo
.
flag
!=
TSDB_COL_TAG
)
{
// ignore the column index check for arithmetic expression.
return
TSDB_CODE_QRY_INVALID_MSG
;
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
}
else
{
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
...
...
@@ -5186,6 +5173,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
if
(
!
validateQuerySourceCols
(
pQueryMsg
,
*
pExpr
))
{
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
...
...
@@ -5193,6 +5181,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
pQueryMsg
->
numOfGroupCols
>
0
)
{
// group by tag columns
*
groupbyCols
=
malloc
(
pQueryMsg
->
numOfGroupCols
*
sizeof
(
SColIndex
));
if
(
*
groupbyCols
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_cleanup
;
}
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfGroupCols
;
++
i
)
{
(
*
groupbyCols
)[
i
].
colId
=
*
(
int16_t
*
)
pMsg
;
...
...
@@ -5248,7 +5240,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
*
pMsg
!=
0
)
{
size_t
len
=
strlen
(
pMsg
)
+
1
;
*
tbnameCond
=
malloc
(
len
);
if
(
*
tbnameCond
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_cleanup
;
}
strcpy
(
*
tbnameCond
,
pMsg
);
pMsg
+=
len
;
}
...
...
@@ -5258,7 +5256,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg
,
pQueryMsg
->
numOfTables
,
pQueryMsg
->
queryType
,
pQueryMsg
->
window
.
skey
,
pQueryMsg
->
window
.
ekey
,
pQueryMsg
->
numOfGroupCols
,
pQueryMsg
->
order
,
pQueryMsg
->
numOfOutput
,
pQueryMsg
->
numOfCols
,
pQueryMsg
->
intervalTime
,
pQueryMsg
->
fillType
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
limit
,
pQueryMsg
->
offset
);
return
0
;
return
TSDB_CODE_SUCCESS
;
_cleanup:
tfree
(
*
pExpr
);
...
...
@@ -5268,7 +5267,8 @@ _cleanup:
tfree
(
*
groupbyCols
);
tfree
(
*
tagCols
);
tfree
(
*
tagCond
);
return
TSDB_CODE_QRY_INVALID_MSG
;
return
code
;
}
static
int32_t
buildAirthmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
SQueryTableMsg
*
pQueryMsg
)
{
...
...
@@ -5669,7 +5669,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery
->
window
=
pQueryMsg
->
window
;
if
(
sem_init
(
&
pQInfo
->
dataReady
,
0
,
0
)
!=
0
)
{
qError
(
"QInfo:%p init dataReady sem failed, reason:%s"
,
pQInfo
,
strerror
(
errno
));
int32_t
code
=
TAOS_SYSTEM_ERROR
(
errno
);
qError
(
"QInfo:%p init dataReady sem failed, reason:%s"
,
pQInfo
,
tstrerror
(
code
));
goto
_cleanup
;
}
...
...
@@ -5680,7 +5681,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
_cleanup:
freeQInfo
(
pQInfo
);
return
NULL
;
}
...
...
@@ -5722,6 +5722,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return
TSDB_CODE_SUCCESS
;
}
pQInfo
->
param
=
param
;
pQInfo
->
freeFn
=
fn
;
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table qualified for tag filter, abort query"
,
pQInfo
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
...
...
@@ -5731,7 +5734,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
}
// filter the qualified
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
,
param
,
fn
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -6031,19 +6034,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
qDebug
(
"QInfo:%p dec refCount, value:%d"
,
pQInfo
,
ref
);
if
(
ref
==
0
)
{
_qinfo_free_fn_t
f
n
=
pQInfo
->
f
n
;
_qinfo_free_fn_t
f
reeFp
=
pQInfo
->
freeF
n
;
void
*
param
=
pQInfo
->
param
;
doDestoryQueryInfo
(
pQInfo
);
if
(
f
n
!=
NULL
)
{
if
(
f
reeFp
!=
NULL
)
{
assert
(
param
!=
NULL
);
f
n
(
param
);
f
reeFp
(
param
);
}
}
}
void
qTableQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
)
{
void
qTableQuery
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
pQInfo
->
signature
!=
pQInfo
)
{
...
...
@@ -6053,17 +6056,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table exists for query, abort"
,
pQInfo
);
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
int32_t
ret
=
setjmp
(
pQInfo
->
env
);
// error occurs, record the error code and return to client
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:%p query abort due to error occurs, code:%s"
,
pQInfo
,
tstrerror
(
pQInfo
->
code
));
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
qDebug
(
"QInfo:%p query task is launched"
,
pQInfo
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
onlyQueryTags
(
pQInfo
->
runtimeEnv
.
pQuery
))
{
assert
(
pQInfo
->
runtimeEnv
.
pQueryHandle
==
NULL
);
buildTagQueryResult
(
pQInfo
);
// todo support the limit/offset
...
...
@@ -6073,6 +6093,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
tableQueryImpl
(
pQInfo
);
}
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
if
(
pQuery
->
rec
.
rows
==
0
)
{
qDebug
(
"QInfo:%p over, %zu tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
}
else
{
qDebug
(
"QInfo:%p query paused, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
}
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
}
...
...
src/query/src/qast.c
浏览文件 @
52951dc9
...
...
@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t
len
=
strlen
(
cond
)
+
VARSTR_HEADER_SIZE
;
char
*
p
=
exception_malloc
(
len
);
varDataSetLen
(
p
,
len
-
VARSTR_HEADER_SIZE
);
memcpy
(
varDataVal
(
p
),
cond
,
len
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
len
-
VARSTR_HEADER_SIZE
);
taosArrayPush
(
pVal
->
arr
,
&
p
);
}
...
...
src/util/src/tcache.c
浏览文件 @
52951dc9
...
...
@@ -547,7 +547,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
}
uError
(
"-------------------free obj:%p"
,
pElem
->
pData
);
free
(
pElem
->
pData
);
free
(
pElem
);
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
52951dc9
...
...
@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) {
assert
(
refCount
>=
0
);
if
(
refCount
>
0
)
{
v
Trace
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
v
Debug
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
return
;
}
...
...
@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
if
(
pVnode
==
NULL
)
return
pVnode
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
v
Trace
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
v
Debug
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
pVnode
;
}
...
...
@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
// release local resources only after cutting off outside connections
taosCacheCleanup
(
pVnode
->
qHandlePool
);
vnodeRelease
(
pVnode
);
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
52951dc9
...
...
@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// this message arrived here by means of the *query* message, so release the vnode is necessary
void
**
qhandle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
(
void
*
)
&
killQueryMsg
->
qhandle
,
sizeof
(
killQueryMsg
->
qhandle
));
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
// todo handle invalid qhandle error
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
vWarn
(
"QInfo:%p invalid qhandle, no matched query handle, conn:%p"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
}
else
{
// qKillQuery((qinfo_t) killQueryMsg->qhandle);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
qhandle
,
true
);
}
vnodeRelease
(
pVnode
);
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
// todo change the error code
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
vnodeRelease
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
pQInfo
))
;
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
pRsp
->
qhandle
=
0
;
pRet
->
len
=
sizeof
(
SQueryTableRsp
);
pRet
->
rsp
=
pRsp
;
...
...
@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
handle
=
taosCachePut
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
),
&
pQInfo
,
sizeof
(
pQInfo
),
tsShellActivityTimer
*
2
);
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
}
else
{
assert
(
pQInfo
==
NULL
);
vnodeRelease
(
pVnode
);
...
...
@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
pVnode
->
vgId
,
pQInfo
);
}
else
{
assert
(
pCont
!=
NULL
);
pQInfo
=
pCont
;
pQInfo
=
*
(
void
**
)(
pCont
);
handle
=
pCont
;
code
=
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg in progress"
,
pVnode
->
vgId
,
pQInfo
);
}
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
// do execute query
qTableQuery
(
pQInfo
);
// do execute query
assert
(
handle
!=
NULL
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
false
);
}
...
...
@@ -146,56 +150,57 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet
*
pRet
=
&
pReadMsg
->
rspRet
;
SRetrieveTableMsg
*
pRetrieve
=
pCont
;
void
*
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
void
*
*
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
*
pQInfo
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
ret
=
0
;
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
&
pQInfo
,
sizeof
(
pQInfo
));
if
(
handle
==
NULL
||
*
handle
!=
pQInfo
)
{
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
));
if
(
handle
==
NULL
||
handle
!=
pQInfo
)
{
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
if
(
pRetrieve
->
free
==
1
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
handle
,
true
);
// int32_t ret = qKillQuery(pQInfo);
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
}
else
{
// todo handle error
}
return
ret
;
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is received"
,
pVnode
->
vgId
,
pQInfo
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is received"
,
pVnode
->
vgId
,
*
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
*
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
//TODO
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
}
else
{
// todo check code and handle error in build result set
code
=
qDumpRetrieveResult
(
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
code
=
qDumpRetrieveResult
(
*
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
if
(
qHasMoreResultsToRetrieve
(
pQInfo
))
{
pRet
->
qhandle
=
pQInfo
;
if
(
qHasMoreResultsToRetrieve
(
*
pQInfo
))
{
pRet
->
qhandle
=
handle
;
code
=
TSDB_CODE_VND_ACTION_NEED_REPROCESSED
;
}
else
{
// no further execution invoked, release the ref to vnode
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
// qDestroyQueryInfo(pQInfo);
}
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
pQInfo
);
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录