Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e8bc9be6
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e8bc9be6
编写于
4月 22, 2021
作者:
H
haojun Liao
提交者:
GitHub
4月 22, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5880 from taosdata/feature/qrefactor
Feature/qrefactor
上级
f2791c36
8405b578
变更
15
展开全部
隐藏空白更改
内联
并排
Showing
15 changed file
with
272 addition
and
270 deletion
+272
-270
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+8
-8
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+7
-7
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+2
-2
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+5
-5
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+6
-6
src/client/src/tscServer.c
src/client/src/tscServer.c
+35
-34
src/client/src/tscSql.c
src/client/src/tscSql.c
+11
-11
src/client/src/tscStream.c
src/client/src/tscStream.c
+15
-15
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+76
-75
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+12
-12
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+42
-42
src/query/src/qResultbuf.c
src/query/src/qResultbuf.c
+3
-3
src/query/src/queryMain.c
src/query/src/queryMain.c
+18
-18
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+26
-26
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+6
-6
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
e8bc9be6
...
...
@@ -57,7 +57,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
strntolower
(
pSql
->
sqlstr
,
sqlstr
,
(
int32_t
)
sqlLen
);
tscDebugL
(
"
%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tscDebugL
(
"
0x%"
PRIx64
" SQL: %s"
,
pSql
->
self
,
pSql
->
sqlstr
);
pCmd
->
curSql
=
pSql
->
sqlstr
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
...
...
@@ -283,7 +283,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
static
void
tscAsyncResultCallback
(
SSchedMsg
*
pMsg
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
taosAcquireRef
(
tscObjRef
,
(
int64_t
)
pMsg
->
ahandle
);
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscDebug
(
"
%p SqlObj is freed, not add into queue async res"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" SqlObj is freed, not add into queue async res"
,
pSql
->
self
);
return
;
}
...
...
@@ -372,13 +372,13 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto
_error
;
}
tscDebug
(
"
%p get %s successfully"
,
pSql
,
msg
);
tscDebug
(
"
0x%"
PRIx64
" get %s successfully"
,
pSql
->
self
,
msg
);
if
(
pSql
->
pStream
==
NULL
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// check if it is a sub-query of super table query first, if true, enter another routine
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
(
TSDB_QUERY_TYPE_STABLE_SUBQUERY
|
TSDB_QUERY_TYPE_SUBQUERY
|
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
)))
{
tscDebug
(
"
%p update local table meta, continue to process sql and send the corresponding query"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" update local table meta, continue to process sql and send the corresponding query"
,
pSql
->
self
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -402,7 +402,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return
;
}
else
{
// continue to process normal async query
if
(
pCmd
->
parseFinished
)
{
tscDebug
(
"
%p update local table meta, continue to process sql and send corresponding query"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" update local table meta, continue to process sql and send corresponding query"
,
pSql
->
self
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
...
...
@@ -416,7 +416,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert
(
pCmd
->
command
!=
TSDB_SQL_INSERT
);
if
(
pCmd
->
command
==
TSDB_SQL_SELECT
)
{
tscDebug
(
"
%p redo parse sql string and proceed"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" redo parse sql string and proceed"
,
pSql
->
self
);
pCmd
->
parseFinished
=
false
;
tscResetSqlCmd
(
pCmd
,
true
);
...
...
@@ -436,7 +436,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
return
;
}
else
{
tscDebug
(
"
%p continue parse sql after get table meta"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" continue parse sql after get table meta"
,
pSql
->
self
);
code
=
tsParseSql
(
pSql
,
false
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
...
...
@@ -486,7 +486,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
}
tscDebug
(
"
%p stream:%p meta is updated, start new query, command:%d"
,
pSql
,
pSql
->
pStream
,
pSql
->
cmd
.
command
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p meta is updated, start new query, command:%d"
,
pSql
->
self
,
pSql
->
pStream
,
pSql
->
cmd
.
command
);
if
(
!
pSql
->
cmd
.
parseFinished
)
{
tsParseSql
(
pSql
,
false
);
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
e8bc9be6
...
...
@@ -193,7 +193,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
for
(
int32_t
i
=
0
;
i
<
numOfBuffer
;
++
i
)
{
int32_t
len
=
pMemBuffer
[
i
]
->
fileMeta
.
flushoutData
.
nLength
;
if
(
len
==
0
)
{
tscDebug
(
"
%p no data retrieved from orderOfVnode:%d"
,
pSql
,
i
+
1
);
tscDebug
(
"
0x%"
PRIx64
" no data retrieved from orderOfVnode:%d"
,
pSql
->
self
,
i
+
1
);
continue
;
}
...
...
@@ -203,7 +203,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
if
(
numOfFlush
==
0
||
numOfBuffer
==
0
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
pCmd
->
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
// no result, set the result empty
tscDebug
(
"
%p retrieved no data"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" retrieved no data"
,
pSql
->
self
);
return
;
}
...
...
@@ -235,7 +235,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer
->
numOfVnode
=
numOfBuffer
;
pReducer
->
pDesc
=
pDesc
;
tscDebug
(
"
%p the number of merged leaves is: %d"
,
pSql
,
pReducer
->
numOfBuffer
);
tscDebug
(
"
0x%"
PRIx64
" the number of merged leaves is: %d"
,
pSql
->
self
,
pReducer
->
numOfBuffer
);
int32_t
idx
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfBuffer
;
++
i
)
{
...
...
@@ -258,7 +258,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
ds
->
pageId
=
0
;
ds
->
rowIdx
=
0
;
tscDebug
(
"
%p load data from disk into memory, orderOfVnode:%d, total:%d"
,
pSql
,
i
+
1
,
idx
+
1
);
tscDebug
(
"
0x%"
PRIx64
" load data from disk into memory, orderOfVnode:%d, total:%d"
,
pSql
->
self
,
i
+
1
,
idx
+
1
);
tExtMemBufferLoadData
(
pMemBuffer
[
i
],
&
(
ds
->
filePage
),
j
,
0
);
#ifdef _DEBUG_VIEW
printf
(
"load data page into mem for build loser tree: %"
PRIu64
" rows
\n
"
,
ds
->
filePage
.
num
);
...
...
@@ -272,7 +272,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
#endif
if
(
ds
->
filePage
.
num
==
0
)
{
// no data in this flush, the index does not increase
tscDebug
(
"
%p flush data is empty, ignore %d flush record"
,
pSql
,
idx
);
tscDebug
(
"
0x%"
PRIx64
" flush data is empty, ignore %d flush record"
,
pSql
->
self
,
idx
);
tfree
(
ds
);
continue
;
}
...
...
@@ -547,10 +547,10 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
pLocalMerge
->
numOfCompleted
=
0
;
free
(
pLocalMerge
);
}
else
{
tscDebug
(
"
%p already freed or another free function is invoked"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" already freed or another free function is invoked"
,
pSql
->
self
);
}
tscDebug
(
"
%p free local reducer finished"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" free local reducer finished"
,
pSql
->
self
);
}
static
int32_t
createOrderDescriptor
(
tOrderDescriptor
**
pOrderDesc
,
SSqlCmd
*
pCmd
,
SColumnModel
*
pModel
)
{
...
...
src/client/src/tscParseInsert.c
浏览文件 @
e8bc9be6
...
...
@@ -1089,7 +1089,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str
=
pCmd
->
curSql
;
}
tscDebug
(
"
%p create data block list hashList:%p"
,
pSql
,
pCmd
->
pTableBlockHashList
);
tscDebug
(
"
0x%"
PRIx64
" create data block list hashList:%p"
,
pSql
->
self
,
pCmd
->
pTableBlockHashList
);
while
(
1
)
{
int32_t
index
=
0
;
...
...
@@ -1303,7 +1303,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
((
!
pCmd
->
parseFinished
)
&&
(
!
initial
))
{
tscDebug
(
"
%p resume to parse sql: %s"
,
pSql
,
pCmd
->
curSql
);
tscDebug
(
"
0x%"
PRIx64
" resume to parse sql: %s"
,
pSql
->
self
,
pCmd
->
curSql
);
}
ret
=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
...
...
src/client/src/tscProfile.c
浏览文件 @
e8bc9be6
...
...
@@ -61,7 +61,7 @@ void tscAddIntoSqlList(SSqlObj *pSql) {
pSql
->
stime
=
taosGetTimestampMs
();
pSql
->
listed
=
1
;
tscDebug
(
"
%p added into sqlList"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" added into sqlList"
,
pSql
->
self
);
}
void
tscSaveSlowQueryFpCb
(
void
*
param
,
TAOS_RES
*
result
,
int
code
)
{
...
...
@@ -99,7 +99,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
return
;
}
tscDebug
(
"
%p query time:%"
PRId64
" sql:%s"
,
pSql
,
pSql
->
res
.
useconds
,
pSql
->
sqlstr
);
tscDebug
(
"
0x%"
PRIx64
" query time:%"
PRId64
" sql:%s"
,
pSql
->
self
,
pSql
->
res
.
useconds
,
pSql
->
sqlstr
);
int32_t
sqlSize
=
(
int32_t
)(
TSDB_SLOW_QUERY_SQL_LEN
+
size
);
char
*
sql
=
malloc
(
sqlSize
);
...
...
@@ -141,7 +141,7 @@ void tscRemoveFromSqlList(SSqlObj *pSql) {
pSql
->
listed
=
0
;
tscSaveSlowQuery
(
pSql
);
tscDebug
(
"
%p removed from sqlList"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" removed from sqlList"
,
pSql
->
self
);
}
void
tscKillQuery
(
STscObj
*
pObj
,
uint32_t
killId
)
{
...
...
@@ -158,7 +158,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
if
(
pSql
==
NULL
)
{
tscError
(
"failed to kill query, id:%d, it may have completed/terminated"
,
killId
);
}
else
{
tscDebug
(
"
%p query is killed, queryId:%d"
,
pSql
,
killId
);
tscDebug
(
"
0x%"
PRIx64
" query is killed, queryId:%d"
,
pSql
->
self
,
killId
);
taos_stop_query
(
pSql
);
}
}
...
...
@@ -213,7 +213,7 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
pthread_mutex_unlock
(
&
pObj
->
mutex
);
if
(
pStream
)
{
tscDebug
(
"
%p stream:%p is killed, streamId:%d"
,
pStream
->
pSql
,
pStream
,
killId
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p is killed, streamId:%d"
,
pStream
->
pSql
->
self
,
pStream
,
killId
);
if
(
pStream
->
callback
)
{
pStream
->
callback
(
pStream
->
param
);
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
e8bc9be6
...
...
@@ -5508,15 +5508,15 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
pQueryInfo
->
clauseLimit
=
pQueryInfo
->
limit
.
limit
;
pQueryInfo
->
slimit
=
pQuerySqlNode
->
slimit
;
tscDebug
(
"
%p limit:%"
PRId64
", offset:%"
PRId64
" slimit:%"
PRId64
", soffset:%"
PRId64
,
pSql
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
slimit
.
limit
,
pQueryInfo
->
slimit
.
offset
);
tscDebug
(
"
0x%"
PRIx64
" limit:%"
PRId64
", offset:%"
PRId64
" slimit:%"
PRId64
", soffset:%"
PRId64
,
pSql
->
self
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
slimit
.
limit
,
pQueryInfo
->
slimit
.
offset
);
if
(
pQueryInfo
->
slimit
.
offset
<
0
||
pQueryInfo
->
limit
.
offset
<
0
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg0
);
}
if
(
pQueryInfo
->
limit
.
limit
==
0
)
{
tscDebug
(
"
%p limit 0, no output result"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" limit 0, no output result"
,
pSql
->
self
);
pQueryInfo
->
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5538,7 +5538,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
}
if
(
pQueryInfo
->
slimit
.
limit
==
0
)
{
tscDebug
(
"
%p slimit 0, no output result"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" slimit 0, no output result"
,
pSql
->
self
);
pQueryInfo
->
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5556,7 +5556,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// No tables included. No results generated. Query results are empty.
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
==
0
)
{
tscDebug
(
"
%p no table in super table, no output result"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" no table in super table, no output result"
,
pSql
->
self
);
pQueryInfo
->
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -6351,7 +6351,7 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) {
assert
(
offset
<
totalBufSize
);
str
[
offset
]
=
']'
;
assert
(
offset
<
totalBufSize
);
tscDebug
(
"
%p select clause:%s"
,
pSql
,
str
);
tscDebug
(
"
0x%"
PRIx64
" select clause:%s"
,
pSql
->
self
,
str
);
}
int32_t
doCheckForCreateTable
(
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
SSqlInfo
*
pInfo
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
e8bc9be6
...
...
@@ -147,7 +147,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
// Update the local cached epSet info cached by SqlObj
int32_t
inUse
=
pSql
->
epSet
.
inUse
;
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
vgroupInfo
);
tscDebug
(
"
%p update the epSet in SqlObj, in use before:%d, after:%d"
,
pSql
,
inUse
,
pSql
->
epSet
.
inUse
);
tscDebug
(
"
0x%"
PRIx64
" update the epSet in SqlObj, in use before:%d, after:%d"
,
pSql
->
self
,
inUse
,
pSql
->
epSet
.
inUse
);
}
...
...
@@ -245,11 +245,11 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if
(
pObj
->
hbrid
!=
0
)
{
int32_t
waitingDuring
=
tsShellActivityTimer
*
500
;
tscDebug
(
"
%p send heartbeat in %dms"
,
pSql
,
waitingDuring
);
tscDebug
(
"
0x%"
PRIx64
" send heartbeat in %dms"
,
pSql
->
self
,
waitingDuring
);
taosTmrReset
(
tscProcessActivityTimer
,
waitingDuring
,
(
void
*
)
pObj
->
rid
,
tscTmr
,
&
pObj
->
pTimer
);
}
else
{
tscDebug
(
"
%p start to close tscObj:%p, not send heartbeat again"
,
pSql
,
pObj
);
tscDebug
(
"
0x%"
PRIx64
" start to close tscObj:%p, not send heartbeat again"
,
pSql
->
self
,
pObj
);
}
}
...
...
@@ -326,7 +326,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pSql
->
rpcRid
=
-
1
;
if
(
pObj
->
signature
!=
pObj
)
{
tscDebug
(
"
%p DB connection is closed, cmd:%d pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pObj
,
pObj
->
signature
);
tscDebug
(
"
0x%"
PRIx64
" DB connection is closed, cmd:%d pObj:%p signature:%p"
,
pSql
->
self
,
pCmd
->
command
,
pObj
,
pObj
->
signature
);
taosRemoveRef
(
tscObjRef
,
handle
);
taosReleaseRef
(
tscObjRef
,
handle
);
...
...
@@ -336,8 +336,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
if
(
pQueryInfo
!=
NULL
&&
pQueryInfo
->
type
==
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
tscDebug
(
"
%p
sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
tscDebug
(
"
0x%"
PRIx64
"
sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
->
self
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
taosRemoveRef
(
tscObjRef
,
handle
);
taosReleaseRef
(
tscObjRef
,
handle
);
...
...
@@ -396,13 +396,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pRes
->
rspLen
=
0
;
if
(
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
)
{
tscDebug
(
"
%p query is cancelled, code:%s"
,
pSql
,
tstrerror
(
pRes
->
code
));
tscDebug
(
"
0x%"
PRIx64
" query is cancelled, code:%s"
,
pSql
->
self
,
tstrerror
(
pRes
->
code
));
}
else
{
pRes
->
code
=
rpcMsg
->
code
;
}
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"
%p reset retry counter to be 0 due to success rsp, old:%d"
,
pSql
,
pSql
->
retry
);
tscDebug
(
"
0x%"
PRIx64
" reset retry counter to be 0 due to success rsp, old:%d"
,
pSql
->
self
,
pSql
->
retry
);
pSql
->
retry
=
0
;
}
...
...
@@ -437,10 +437,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pMsg
->
numOfFailedBlocks
=
htonl
(
pMsg
->
numOfFailedBlocks
);
pRes
->
numOfRows
+=
pMsg
->
affectedRows
;
tscDebug
(
"
%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d"
,
pSql
,
sqlCmd
[
pCmd
->
command
],
tscDebug
(
"
0x%"
PRIx64
" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d"
,
pSql
->
self
,
sqlCmd
[
pCmd
->
command
],
tstrerror
(
pRes
->
code
),
pMsg
->
affectedRows
,
pRes
->
rspLen
);
}
else
{
tscDebug
(
"
%p SQL cmd:%s, code:%s rspLen:%d"
,
pSql
,
sqlCmd
[
pCmd
->
command
],
tstrerror
(
pRes
->
code
),
pRes
->
rspLen
);
tscDebug
(
"
0x%"
PRIx64
" SQL cmd:%s, code:%s rspLen:%d"
,
pSql
->
self
,
sqlCmd
[
pCmd
->
command
],
tstrerror
(
pRes
->
code
),
pRes
->
rspLen
);
}
}
...
...
@@ -461,7 +461,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if
(
shouldFree
)
{
// in case of table-meta/vgrouplist query, automatically free it
taosRemoveRef
(
tscObjRef
,
handle
);
tscDebug
(
"
%p sqlObj is automatically freed"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" sqlObj is automatically freed"
,
pSql
->
self
);
}
taosReleaseRef
(
tscObjRef
,
handle
);
...
...
@@ -522,7 +522,7 @@ int tscProcessSql(SSqlObj *pSql) {
assert
((
pQueryInfo
->
numOfTables
==
0
&&
pQueryInfo
->
command
==
TSDB_SQL_HB
)
||
pQueryInfo
->
numOfTables
>
0
);
}
tscDebug
(
"
%p SQL cmd:%s will be processed, name:%s, type:%d"
,
pSql
,
sqlCmd
[
pCmd
->
command
],
name
,
type
);
tscDebug
(
"
0x%"
PRIx64
" SQL cmd:%s will be processed, name:%s, type:%d"
,
pSql
->
self
,
sqlCmd
[
pCmd
->
command
],
name
,
type
);
if
(
pCmd
->
command
<
TSDB_SQL_MGMT
)
{
// the pTableMetaInfo cannot be NULL
if
(
pTableMetaInfo
==
NULL
)
{
pSql
->
res
.
code
=
TSDB_CODE_TSC_APP_ERROR
;
...
...
@@ -562,11 +562,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
pRetrieveMsg
->
header
.
vgId
=
htonl
(
vgId
);
tscDebug
(
"
%p build fetch msg from vgId:%d, vgIndex:%d, qId:%"
PRIu64
,
pSql
,
vgId
,
vgIndex
,
pSql
->
res
.
qId
);
tscDebug
(
"
0x%"
PRIx64
" build fetch msg from vgId:%d, vgIndex:%d, qId:0x%"
PRIx64
,
pSql
->
self
,
vgId
,
vgIndex
,
pSql
->
res
.
qId
);
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d, qId:%"
PRIu64
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
res
.
qId
);
tscDebug
(
"0x%"
PRIx64
" build fetch msg from only one vgroup, vgId:%d, qId:0x%"
PRIx64
,
pSql
->
self
,
pTableMeta
->
vgId
,
pSql
->
res
.
qId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -605,7 +606,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
taosHashGetClone
(
tscVgroupMap
,
&
pTableMeta
->
vgId
,
sizeof
(
pTableMeta
->
vgId
),
NULL
,
&
vgroupInfo
,
sizeof
(
SNewVgroupInfo
));
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
vgroupInfo
);
tscDebug
(
"
%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
tscDebug
(
"
0x%"
PRIx64
" build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
->
self
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
epSet
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -671,7 +672,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
vgId
=
pVgroupInfo
->
vgId
;
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
pVgroupInfo
);
tscDebug
(
"
%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
tscDebug
(
"
0x%"
PRIx64
" query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
->
self
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
}
else
{
vgId
=
pTableMeta
->
vgId
;
...
...
@@ -704,7 +705,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTableIdList
->
itemList
);
pQueryMsg
->
numOfTables
=
htonl
(
numOfTables
);
// set the number of tables
tscDebug
(
"
%p query on stable, vgId:%d, numOfTables:%d, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
tscDebug
(
"
0x%"
PRIx64
" query on stable, vgId:%d, numOfTables:%d, vgIndex:%d, numOfVgroups:%d"
,
pSql
->
self
,
pTableIdList
->
vgInfo
.
vgId
,
numOfTables
,
index
,
numOfVgroups
);
// serialize each table id info
...
...
@@ -722,7 +723,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
char
n
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tscDebug
(
"
%p vgId:%d, query on table:%s, tid:%d, uid:%"
PRIu64
,
pSql
,
htonl
(
pQueryMsg
->
head
.
vgId
),
n
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
);
tscDebug
(
"
0x%"
PRIx64
" vgId:%d, query on table:%s, tid:%d, uid:%"
PRIu64
,
pSql
->
self
,
htonl
(
pQueryMsg
->
head
.
vgId
),
n
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
);
return
pMsg
;
}
...
...
@@ -1139,7 +1140,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
msgLen
=
(
int32_t
)(
pMsg
-
pCmd
->
payload
);
tscDebug
(
"
%p msg built success, len:%d bytes"
,
pSql
,
msgLen
);
tscDebug
(
"
0x%"
PRIx64
" msg built success, len:%d bytes"
,
pSql
->
self
,
msgLen
);
pCmd
->
payloadLen
=
msgLen
;
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
...
...
@@ -1837,7 +1838,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
tscDebug("
%p build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql
, pCmd->count,
tscDebug("
0x%"PRIx64" build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql->self
, pCmd->count,
pCmd->payloadLen);
return pCmd->payloadLen;
...
...
@@ -2017,7 +2018,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
}
}
tscDebug
(
"
%p recv table meta, uid:%"
PRIu64
", tid:%d, name:%s"
,
pSql
,
pTableMeta
->
id
.
uid
,
pTableMeta
->
id
.
tid
,
tscDebug
(
"
0x%"
PRIx64
" recv table meta, uid:%"
PRIu64
", tid:%d, name:%s"
,
pSql
->
self
,
pTableMeta
->
id
.
uid
,
pTableMeta
->
id
.
tid
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
free
(
pTableMeta
);
...
...
@@ -2124,7 +2125,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_SUCCESS;
pSql->res.numOfTotal = i;
tscDebug("
%p load multi-metermeta resp from complete num:%d", pSql
, pSql->res.numOfTotal);
tscDebug("
0x%"PRIx64" load multi-metermeta resp from complete num:%d", pSql->self
, pSql->res.numOfTotal);
#endif
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2292,7 +2293,7 @@ static void createHbObj(STscObj* pObj) {
pSql
->
signature
=
pSql
;
registerSqlObj
(
pSql
);
tscDebug
(
"
%p HB is allocated, pObj:%p"
,
pSql
,
pObj
);
tscDebug
(
"
0x%"
PRIx64
" HB is allocated, pObj:%p"
,
pSql
->
self
,
pObj
);
pObj
->
hbrid
=
pSql
->
self
;
}
...
...
@@ -2318,7 +2319,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
tscUpdateMgmtEpSet
(
pSql
,
&
pConnect
->
epSet
);
for
(
int
i
=
0
;
i
<
pConnect
->
epSet
.
numOfEps
;
++
i
)
{
tscDebug
(
"
%p epSet.fqdn[%d]: %s, pObj:%p"
,
pSql
,
i
,
pConnect
->
epSet
.
fqdn
[
i
],
pObj
);
tscDebug
(
"
0x%"
PRIx64
" epSet.fqdn[%d]: %s, pObj:%p"
,
pSql
->
self
,
i
,
pConnect
->
epSet
.
fqdn
[
i
],
pObj
);
}
}
...
...
@@ -2362,7 +2363,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
name
);
taosHashRemove
(
tscTableMetaInfo
,
name
,
strnlen
(
name
,
TSDB_TABLE_FNAME_LEN
));
tscDebug
(
"
%p remove table meta after drop table:%s, numOfRemain:%d"
,
pSql
,
name
,
(
int32_t
)
taosHashGetSize
(
tscTableMetaInfo
));
tscDebug
(
"
0x%"
PRIx64
" remove table meta after drop table:%s, numOfRemain:%d"
,
pSql
->
self
,
name
,
(
int32_t
)
taosHashGetSize
(
tscTableMetaInfo
));
pTableMetaInfo
->
pTableMeta
=
NULL
;
return
0
;
...
...
@@ -2374,7 +2375,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
name
);
tscDebug
(
"
%p remove tableMeta in hashMap after alter-table: %s"
,
pSql
,
name
);
tscDebug
(
"
0x%"
PRIx64
" remove tableMeta in hashMap after alter-table: %s"
,
pSql
->
self
,
name
);
bool
isSuperTable
=
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
);
taosHashRemove
(
tscTableMetaInfo
,
name
,
strnlen
(
name
,
TSDB_TABLE_FNAME_LEN
));
...
...
@@ -2405,7 +2406,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
pRes
->
data
=
NULL
;
tscResetForNextRetrieve
(
pRes
);
tscDebug
(
"
%p query rsp received, qId:%"
PRIu64
,
pSql
,
pRes
->
qId
);
tscDebug
(
"
0x%"
PRIx64
" query rsp received, qId:0x%"
PRIx64
,
pSql
->
self
,
pRes
->
qId
);
return
0
;
}
...
...
@@ -2463,7 +2464,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
}
pRes
->
row
=
0
;
tscDebug
(
"
%p numOfRows:%d, offset:%"
PRId64
", complete:%d, qId:%"
PRIu64
,
pSql
,
pRes
->
numOfRows
,
pRes
->
offset
,
tscDebug
(
"
0x%"
PRIx64
" numOfRows:%d, offset:%"
PRId64
", complete:%d, qId:0x%"
PRIx64
,
pSql
->
self
,
pRes
->
numOfRows
,
pRes
->
offset
,
pRes
->
completed
,
pRes
->
qId
);
return
0
;
...
...
@@ -2507,14 +2508,14 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
}
}
tscDebug
(
"%p new pSqlObj:%p to get tableMeta, auto create:%d"
,
pSql
,
pNew
,
pNew
->
cmd
.
autoCreated
);
registerSqlObj
(
pNew
);
tscDebug
(
"0x%"
PRIx64
" new pSqlObj:0x%"
PRIx64
" to get tableMeta, auto create:%d"
,
pSql
->
self
,
pNew
->
self
,
pNew
->
cmd
.
autoCreated
);
pNew
->
fp
=
tscTableMetaCallBack
;
pNew
->
param
=
(
void
*
)
pSql
->
self
;
tscDebug
(
"
%p metaRid from %"
PRId64
" to %"
PRId64
,
pSql
,
pSql
->
metaRid
,
pNew
->
self
);
tscDebug
(
"
0x%"
PRIx64
" metaRid from %"
PRId64
" to %"
PRId64
,
pSql
->
self
,
pSql
->
metaRid
,
pNew
->
self
);
pSql
->
metaRid
=
pNew
->
self
;
...
...
@@ -2585,7 +2586,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
pTableMeta
)
{
tscDebug
(
"
%p update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%"
PRId64
,
pSql
,
name
,
tscDebug
(
"
0x%"
PRIx64
" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%"
PRId64
,
pSql
->
self
,
name
,
tscGetNumOfTags
(
pTableMeta
),
tscGetNumOfColumns
(
pTableMeta
),
pTableMeta
->
id
.
uid
);
}
...
...
@@ -2645,12 +2646,12 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNewQueryInfo
->
numOfTables
=
pQueryInfo
->
numOfTables
;
registerSqlObj
(
pNew
);
tscDebug
(
"
%p svgroupRid from %"
PRId64
" to %"
PRId64
,
pSql
,
pSql
->
svgroupRid
,
pNew
->
self
);
tscDebug
(
"
0x%"
PRIx64
" svgroupRid from %"
PRId64
" to %"
PRId64
,
pSql
->
self
,
pSql
->
svgroupRid
,
pNew
->
self
);
pSql
->
svgroupRid
=
pNew
->
self
;
tscDebug
(
"
%p new sqlObj:%p to get vgroupInfo, numOfTables:%d"
,
pSql
,
pNew
,
pNewQueryInfo
->
numOfTables
);
tscDebug
(
"
0x%"
PRIx64
" new sqlObj:%p to get vgroupInfo, numOfTables:%d"
,
pSql
->
self
,
pNew
,
pNewQueryInfo
->
numOfTables
);
pNew
->
fp
=
tscTableMetaCallBack
;
pNew
->
param
=
(
void
*
)
pSql
->
self
;
...
...
src/client/src/tscSql.c
浏览文件 @
e8bc9be6
...
...
@@ -292,7 +292,7 @@ void taos_close(TAOS *taos) {
pHb
->
rpcRid
=
-
1
;
}
tscDebug
(
"
%p HB is freed"
,
pHb
);
tscDebug
(
"
0x%"
PRIx64
" HB is freed"
,
pHb
->
self
);
taosReleaseRef
(
tscObjRef
,
pHb
->
self
);
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
...
...
@@ -576,7 +576,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
cmd
==
TSDB_SQL_FETCH
))
{
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
tscDebug
(
"
%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s"
,
pSql
,
sqlCmd
[
pCmd
->
command
]);
tscDebug
(
"
0x%"
PRIx64
" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s"
,
pSql
->
self
,
sqlCmd
[
pCmd
->
command
]);
tscProcessSql
(
pSql
);
return
false
;
...
...
@@ -594,7 +594,7 @@ void taos_free_result(TAOS_RES *res) {
bool
freeNow
=
tscKillQueryInDnode
(
pSql
);
if
(
freeNow
)
{
tscDebug
(
"
%p free sqlObj in cache"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" free sqlObj in cache"
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
}
}
...
...
@@ -708,7 +708,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
tscUnlockByThread
(
&
pSql
->
squeryLock
);
tscDebug
(
"
%p super table query cancelled"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" super table query cancelled"
,
pSql
->
self
);
}
void
taos_stop_query
(
TAOS_RES
*
res
)
{
...
...
@@ -717,7 +717,7 @@ void taos_stop_query(TAOS_RES *res) {
return
;
}
tscDebug
(
"
%p start to cancel query"
,
res
);
tscDebug
(
"
0x%"
PRIx64
" start to cancel query"
,
pSql
->
self
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
// set the error code for master pSqlObj firstly
...
...
@@ -744,7 +744,7 @@ void taos_stop_query(TAOS_RES *res) {
}
}
tscDebug
(
"
%p query is cancelled"
,
res
);
tscDebug
(
"
0x%"
PRIx64
" query is cancelled"
,
pSql
->
self
);
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
...
...
@@ -877,7 +877,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pRes
->
numOfClauseTotal
=
0
;
tscDebug
(
"
%p Valid SQL: %s pObj:%p"
,
pSql
,
sql
,
pObj
);
tscDebug
(
"
0x%"
PRIx64
" Valid SQL: %s pObj:%p"
,
pSql
->
self
,
sql
,
pObj
);
int32_t
sqlLen
=
(
int32_t
)
strlen
(
sql
);
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
...
...
@@ -889,7 +889,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscDebug
(
"
%p Valid SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
pSql
),
pObj
);
tscDebug
(
"
0x%"
PRIx64
" Valid SQL result:%d, %s pObj:%p"
,
pSql
->
self
,
pRes
->
code
,
taos_errstr
(
pSql
),
pObj
);
tfree
(
pSql
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -914,7 +914,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"
%p Valid SQL result:%d, %s pObj:%p"
,
pSql
,
code
,
taos_errstr
(
pSql
),
pObj
);
tscDebug
(
"
0x%"
PRIx64
" Valid SQL result:%d, %s pObj:%p"
,
pSql
->
self
,
code
,
taos_errstr
(
pSql
),
pObj
);
}
taos_free_result
(
pSql
);
...
...
@@ -1027,7 +1027,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pRes
->
numOfClauseTotal
=
0
;
assert
(
pSql
->
fp
==
NULL
);
tscDebug
(
"
%p tableNameList: %s pObj:%p"
,
pSql
,
tableNameList
,
pObj
);
tscDebug
(
"
0x%"
PRIx64
" tableNameList: %s pObj:%p"
,
pSql
->
self
,
tableNameList
,
pObj
);
int32_t
tblListLen
=
(
int32_t
)
strlen
(
tableNameList
);
if
(
tblListLen
>
MAX_TABLE_NAME_LENGTH
)
{
...
...
@@ -1061,7 +1061,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscDoQuery
(
pSql
);
tscDebug
(
"
%p load multi table meta result:%d %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
pSql
),
pObj
);
tscDebug
(
"
0x%"
PRIx64
" load multi table meta result:%d %s pObj:%p"
,
pSql
->
self
,
pRes
->
code
,
taos_errstr
(
pSql
),
pObj
);
if
((
code
=
pRes
->
code
)
!=
TSDB_CODE_SUCCESS
)
{
tscFreeSqlObj
(
pSql
);
}
...
...
src/client/src/tscStream.c
浏览文件 @
e8bc9be6
...
...
@@ -70,7 +70,7 @@ static void setRetryInfo(SSqlStream* pStream, int32_t code) {
pSql
->
res
.
code
=
code
;
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
tscDebug
(
"
%p stream:%p, get table Meta failed, retry in %"
PRId64
"ms"
,
pSql
,
pStream
,
retryDelayTime
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p, get table Meta failed, retry in %"
PRId64
"ms"
,
pSql
->
self
,
pStream
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pSql
,
retryDelayTime
);
}
...
...
@@ -104,7 +104,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
// failed to get table Meta or vgroup list, retry in 10sec.
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscTansformFuncForSTableQuery
(
pQueryInfo
);
tscDebug
(
"
%p stream:%p, start stream query on:%s"
,
pSql
,
pStream
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
tscDebug
(
"
0x%"
PRIx64
" stream:%p, start stream query on:%s"
,
pSql
->
self
,
pStream
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
pSql
->
fp
=
tscProcessStreamQueryCallback
;
pSql
->
fetchFp
=
tscProcessStreamQueryCallback
;
...
...
@@ -131,7 +131,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream
->
numOfRes
=
0
;
// reset the numOfRes.
SSqlObj
*
pSql
=
pStream
->
pSql
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
tscDebug
(
"
%p add into timer"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" add into timer"
,
pSql
->
self
);
if
(
pStream
->
isProject
)
{
/*
...
...
@@ -237,7 +237,7 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
}
if (rowNum > 0) {
tscDebug("
%p
stream:%p %d rows padded", pSql, pStream, rowNum);
tscDebug("
0x%"PRIx64"
stream:%p %d rows padded", pSql, pStream, rowNum);
}
pRes->numOfRows = 0;
...
...
@@ -263,7 +263,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
if
(
row
!=
NULL
)
{
tscDebug
(
"
%p stream:%p fetch result"
,
pSql
,
pStream
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p fetch result"
,
pSql
->
self
,
pStream
);
tscStreamFillTimeGap
(
pStream
,
*
(
TSKEY
*
)
row
[
0
]);
pStream
->
stime
=
*
(
TSKEY
*
)
row
[
0
];
// user callback function
...
...
@@ -293,7 +293,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream
->
stime
+=
1
;
}
tscDebug
(
"
%p stream:%p, query on:%s, fetch result completed, fetched rows:%"
PRId64
,
pSql
,
pStream
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
),
tscDebug
(
"
0x%"
PRIx64
" stream:%p, query on:%s, fetch result completed, fetched rows:%"
PRId64
,
pSql
->
self
,
pStream
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
),
pStream
->
numOfRes
);
tfree
(
pTableMetaInfo
->
pTableMeta
);
...
...
@@ -318,8 +318,8 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
/*
* current time window will be closed, since it too early to exceed the maxRetentWindow value
*/
tscDebug
(
"
%p
stream:%p, etime:%"
PRId64
" is too old, exceeds the max retention time window:%"
PRId64
", stop the stream"
,
pStream
->
pSql
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
tscDebug
(
"
0x%"
PRIx64
"
stream:%p, etime:%"
PRId64
" is too old, exceeds the max retention time window:%"
PRId64
", stop the stream"
,
pStream
->
pSql
->
self
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
// TODO : How to terminate stream here
if
(
pStream
->
callback
)
{
// Callback function from upper level
...
...
@@ -329,10 +329,10 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
return
;
}
tscDebug
(
"
%p stream:%p, next start at %"
PRId64
", in %"
PRId64
"ms. delay:%"
PRId64
"ms qrange %"
PRId64
"-%"
PRId64
,
pStream
->
pSql
,
pStream
,
tscDebug
(
"
0x%"
PRIx64
" stream:%p, next start at %"
PRId64
", in %"
PRId64
"ms. delay:%"
PRId64
"ms qrange %"
PRId64
"-%"
PRId64
,
pStream
->
pSql
->
self
,
pStream
,
now
+
timer
,
timer
,
delay
,
pStream
->
stime
,
etime
);
}
else
{
tscDebug
(
"
%p stream:%p, next start at %"
PRId64
", in %"
PRId64
"ms. delay:%"
PRId64
"ms qrange %"
PRId64
"-%"
PRId64
,
pStream
->
pSql
,
pStream
,
tscDebug
(
"
0x%"
PRIx64
" stream:%p, next start at %"
PRId64
", in %"
PRId64
"ms. delay:%"
PRId64
"ms qrange %"
PRId64
"-%"
PRId64
,
pStream
->
pSql
->
self
,
pStream
,
pStream
->
stime
,
timer
,
delay
,
pStream
->
stime
-
pStream
->
interval
.
interval
,
pStream
->
stime
-
1
);
}
...
...
@@ -378,8 +378,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
*/
timer
=
pStream
->
interval
.
sliding
;
if
(
pStream
->
stime
>
pStream
->
etime
)
{
tscDebug
(
"
%p stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
->
self
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
// TODO : How to terminate stream here
if
(
pStream
->
callback
)
{
// Callback function from upper level
...
...
@@ -392,7 +392,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t
stime
=
taosTimeTruncate
(
pStream
->
stime
-
1
,
&
pStream
->
interval
,
pStream
->
precision
);
//int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
if
(
stime
>=
pStream
->
etime
)
{
tscDebug
(
"
%p stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
,
pStream
,
tscDebug
(
"
0x%"
PRIx64
" stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
->
self
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
// TODO : How to terminate stream here
if
(
pStream
->
callback
)
{
...
...
@@ -558,7 +558,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
taosTmrReset
(
tscProcessStreamTimer
,
(
int32_t
)
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"
%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
tscDebug
(
"
0x%"
PRIx64
" stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
->
self
,
pStream
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
),
pStream
->
interval
.
interval
,
pStream
->
interval
.
sliding
,
starttime
,
pSql
->
sqlstr
);
}
...
...
@@ -645,7 +645,7 @@ void taos_close_stream(TAOS_STREAM *handle) {
taosTmrStopA
(
&
(
pStream
->
pTimer
));
tscDebug
(
"
%p stream:%p is closed"
,
pSql
,
pStream
);
tscDebug
(
"
0x%"
PRIx64
" stream:%p is closed"
,
pSql
->
self
,
pStream
);
// notify CQ to release the pStream object
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
pStream
->
pSql
=
NULL
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
e8bc9be6
此差异已折叠。
点击以展开。
src/client/src/tscUtil.c
浏览文件 @
e8bc9be6
...
...
@@ -496,10 +496,10 @@ void tscFreeSubobj(SSqlObj* pSql) {
return
;
}
tscDebug
(
"
%p start to free sub SqlObj, numOfSub:%d"
,
pSql
,
pSql
->
subState
.
numOfSub
);
tscDebug
(
"
0x%"
PRIx64
" start to free sub SqlObj, numOfSub:%d"
,
pSql
->
self
,
pSql
->
subState
.
numOfSub
);
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
tscDebug
(
"
%p free sub SqlObj:%p, index:%d"
,
pSql
,
pSql
->
pSubs
[
i
],
i
);
tscDebug
(
"
0x%"
PRIx64
" free sub SqlObj:%p, index:%d"
,
pSql
->
self
,
pSql
->
pSubs
[
i
],
i
);
taos_free_result
(
pSql
->
pSubs
[
i
]);
pSql
->
pSubs
[
i
]
=
NULL
;
}
...
...
@@ -530,7 +530,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_sub_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"
%p free SqlObj, total in tscObj:%d, total:%d"
,
pSql
,
num
,
total
);
tscDebug
(
"
0x%"
PRIx64
" free SqlObj, total in tscObj:%d, total:%d"
,
p
->
self
,
num
,
total
);
tscFreeSqlObj
(
p
);
taosReleaseRef
(
tscRefId
,
pTscObj
->
rid
);
...
...
@@ -553,7 +553,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
return
;
}
tscDebug
(
"
%p start to free sqlObj"
,
pSql
);
tscDebug
(
"
0x%"
PRIx64
" start to free sqlObj"
,
pSql
->
self
);
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
...
...
@@ -945,7 +945,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
tscSortRemoveDataBlockDupRows
(
pOneTableBlock
);
char
*
ekey
=
(
char
*
)
pBlocks
->
data
+
pOneTableBlock
->
rowSize
*
(
pBlocks
->
numOfRows
-
1
);
tscDebug
(
"
%p name:%s, name:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pSql
,
tNameGetTableName
(
&
pOneTableBlock
->
tableName
),
tscDebug
(
"
0x%"
PRIx64
" name:%s, name:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pSql
->
self
,
tNameGetTableName
(
&
pOneTableBlock
->
tableName
),
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
GET_INT64_VAL
(
pBlocks
->
data
),
GET_INT64_VAL
(
ekey
));
int32_t
len
=
pBlocks
->
numOfRows
*
(
pOneTableBlock
->
rowSize
+
expandSize
)
+
sizeof
(
STColumn
)
*
tscGetNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
...
...
@@ -2109,7 +2109,7 @@ void registerSqlObj(SSqlObj* pSql) {
int32_t
num
=
atomic_add_fetch_32
(
&
pSql
->
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_add_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"
%p new SqlObj from %p, total in tscObj:%d, total:%d"
,
pSql
,
pSql
->
pTscObj
,
num
,
total
);
tscDebug
(
"
0x%"
PRIx64
" new SqlObj from %p, total in tscObj:%d, total:%d"
,
pSql
->
self
,
pSql
->
pTscObj
,
num
,
total
);
}
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
)
{
...
...
@@ -2376,7 +2376,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
tscPrintSelectClause
(
pNew
,
0
);
}
else
{
tscDebug
(
"
%p new sub insertion: %p, vnodeIdx:%d"
,
pSql
,
pNew
,
pTableMetaInfo
->
vgroupIndex
);
tscDebug
(
"
0x%"
PRIx64
" new sub insertion: %p, vnodeIdx:%d"
,
pSql
->
self
,
pNew
,
pTableMetaInfo
->
vgroupIndex
);
}
registerSqlObj
(
pNew
);
...
...
@@ -2627,7 +2627,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
int32_t
totalVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
if
(
++
pTableMetaInfo
->
vgroupIndex
<
totalVgroups
)
{
tscDebug
(
"
%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%"
PRId64
,
pSql
,
tscDebug
(
"
0x%"
PRIx64
" results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%"
PRId64
,
pSql
->
self
,
pTableMetaInfo
->
vgroupIndex
-
1
,
pTableMetaInfo
->
vgroupIndex
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
/*
...
...
@@ -2646,8 +2646,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
assert
((
pRes
->
offset
>=
0
&&
pRes
->
numOfRows
==
0
)
||
(
pRes
->
offset
==
0
&&
pRes
->
numOfRows
>=
0
));
tscDebug
(
"
%p
new query to next vgroup, index:%d, limit:%"
PRId64
", offset:%"
PRId64
", glimit:%"
PRId64
,
pSql
,
pTableMetaInfo
->
vgroupIndex
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
clauseLimit
);
tscDebug
(
"
0x%"
PRIx64
"
new query to next vgroup, index:%d, limit:%"
PRId64
", offset:%"
PRId64
", glimit:%"
PRId64
,
pSql
->
self
,
pTableMetaInfo
->
vgroupIndex
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
clauseLimit
);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
...
...
@@ -2664,7 +2664,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
pSql
->
fp
=
fp
;
tscProcessSql
(
pSql
);
}
else
{
tscDebug
(
"
%p try all %d vnodes, query complete. current numOfRes:%"
PRId64
,
pSql
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
tscDebug
(
"
0x%"
PRIx64
" try all %d vnodes, query complete. current numOfRes:%"
PRId64
,
pSql
->
self
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
}
}
...
...
@@ -2690,7 +2690,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pSql
->
subState
.
numOfSub
=
0
;
pSql
->
fp
=
fp
;
tscDebug
(
"
%p try data in the next subclause:%d, total subclause:%d"
,
pSql
,
pCmd
->
clauseIndex
,
pCmd
->
numOfClause
);
tscDebug
(
"
0x%"
PRIx64
" try data in the next subclause:%d, total subclause:%d"
,
pSql
->
self
,
pCmd
->
clauseIndex
,
pCmd
->
numOfClause
);
if
(
pCmd
->
command
>
TSDB_SQL_LOCAL
)
{
tscProcessLocalCmd
(
pSql
);
}
else
{
...
...
src/query/src/qExecutor.c
浏览文件 @
e8bc9be6
...
...
@@ -1359,7 +1359,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
type
==
TSDB_DATA_TYPE_FLOAT
||
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
qError
(
"QInfo:
%"
PRIu
64
" group by not supported on double/float columns, abort"
,
GET_QID
(
pRuntimeEnv
));
qError
(
"QInfo:
0x%"
PRIx
64
" group by not supported on double/float columns, abort"
,
GET_QID
(
pRuntimeEnv
));
return
;
}
...
...
@@ -1746,7 +1746,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
}
static
int32_t
setupQueryRuntimeEnv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfTables
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" setup runtime env"
,
GET_QID
(
pRuntimeEnv
));
qDebug
(
"QInfo:
0x%"
PRIx
64
" setup runtime env"
,
GET_QID
(
pRuntimeEnv
));
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
pRuntimeEnv
->
prevGroupId
=
INT32_MIN
;
...
...
@@ -1779,7 +1779,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
*
(
int64_t
*
)
pRuntimeEnv
->
prevRow
[
0
]
=
INT64_MIN
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" init runtime environment completed"
,
GET_QID
(
pRuntimeEnv
));
qDebug
(
"QInfo:
0x%"
PRIx
64
" init runtime environment completed"
,
GET_QID
(
pRuntimeEnv
));
// group by normal column, sliding window query, interval query are handled by interval query processor
// interval (down sampling operation)
...
...
@@ -1895,7 +1895,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
pRuntimeEnv
->
qinfo
;
qDebug
(
"QInfo:
%"
PRIu
64
" teardown runtime env"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" teardown runtime env"
,
pQInfo
->
qId
);
if
(
pRuntimeEnv
->
sasArray
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -2121,7 +2121,7 @@ bool colIdCheck(SQuery *pQuery, uint64_t qId) {
// load data column information is incorrect
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
-
1
;
++
i
)
{
if
(
pQuery
->
colList
[
i
].
colId
==
pQuery
->
colList
[
i
+
1
].
colId
)
{
qError
(
"QInfo:
%"
PRIu
64
" invalid data load column for query"
,
qId
);
qError
(
"QInfo:
0x%"
PRIx
64
" invalid data load column for query"
,
qId
);
return
false
;
}
}
...
...
@@ -2208,13 +2208,13 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
// in case of point-interpolation query, use asc order scan
char
msg
[]
=
"QInfo:
%"
PRIu
64
" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
PRId64
char
msg
[]
=
"QInfo:
0x%"
PRIx
64
" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
PRId64
"-%"
PRId64
", new qrange:%"
PRId64
"-%"
PRId64
;
// todo handle the case the the order irrelevant query type mixed up with order critical query type
// descending order query for last_row query
if
(
isFirstLastRowQuery
(
pQuery
))
{
qDebug
(
"QInfo:
%"
PRIu
64
" scan order changed for last_row query, old:%d, new:%d"
,
pQInfo
->
qId
,
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" scan order changed for last_row query, old:%d, new:%d"
,
pQInfo
->
qId
,
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
);
pQuery
->
order
.
order
=
TSDB_ORDER_ASC
;
if
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
)
{
...
...
@@ -2692,7 +2692,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
*
status
=
updateBlockLoadStatus
(
pRuntimeEnv
->
pQuery
,
*
status
);
if
((
*
status
)
==
BLK_DATA_NO_NEEDED
||
(
*
status
)
==
BLK_DATA_DISCARD
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
pBlockInfo
->
window
.
skey
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
discardBlocks
+=
1
;
}
else
if
((
*
status
)
==
BLK_DATA_STATIS_NEEDED
)
{
...
...
@@ -2735,7 +2735,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
(
char
*
)
&
(
pBlock
->
pBlockStatis
[
i
].
max
));
if
(
!
load
)
{
// current block has been discard due to filter applied
pCost
->
discardBlocks
+=
1
;
qDebug
(
"QInfo:
%"
PRIu
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
(
*
status
)
=
BLK_DATA_DISCARD
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2747,7 +2747,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// current block has been discard due to filter applied
if
(
!
doFilterByBlockStatistics
(
pRuntimeEnv
,
pBlock
->
pBlockStatis
,
pTableScanInfo
->
pCtx
,
pBlockInfo
->
rows
))
{
pCost
->
discardBlocks
+=
1
;
qDebug
(
"QInfo:
%"
PRIu
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
pBlockInfo
->
window
.
skey
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
pQInfo
->
qId
,
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
(
*
status
)
=
BLK_DATA_DISCARD
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3414,10 +3414,10 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr
int16_t
tagType
=
pCtx
[
0
].
tag
.
nType
;
if
(
tagType
==
TSDB_DATA_TYPE_BINARY
||
tagType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" set tag value for join comparison, colId:%"
PRId64
", val:%s"
,
GET_QID
(
pRuntimeEnv
),
qDebug
(
"QInfo:
0x%"
PRIx
64
" set tag value for join comparison, colId:%"
PRId64
", val:%s"
,
GET_QID
(
pRuntimeEnv
),
pExprInfo
->
base
.
arg
->
argValue
.
i64
,
pCtx
[
0
].
tag
.
pz
);
}
else
{
qDebug
(
"QInfo:
%"
PRIu
64
" set tag value for join comparison, colId:%"
PRId64
", val:%"
PRId64
,
GET_QID
(
pRuntimeEnv
),
qDebug
(
"QInfo:
0x%"
PRIx
64
" set tag value for join comparison, colId:%"
PRId64
", val:%"
PRId64
,
GET_QID
(
pRuntimeEnv
),
pExprInfo
->
base
.
arg
->
argValue
.
i64
,
pCtx
[
0
].
tag
.
i64
);
}
}
...
...
@@ -3437,9 +3437,9 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
// failed to find data with the specified tag value and vnodeId
if
(
!
tsBufIsValidElem
(
&
elem
))
{
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qError
(
"QInfo:
%"
PRIu
64
" failed to find tag:%s in ts_comp"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
);
qError
(
"QInfo:
0x%"
PRIx
64
" failed to find tag:%s in ts_comp"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
);
}
else
{
qError
(
"QInfo:
%"
PRIu
64
" failed to find tag:%"
PRId64
" in ts_comp"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
);
qError
(
"QInfo:
0x%"
PRIx
64
" failed to find tag:%"
PRId64
" in ts_comp"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
);
}
return
-
1
;
...
...
@@ -3448,17 +3448,17 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag,
// Keep the cursor info of current table
pTableQueryInfo
->
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTsBuf
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:
%"
PRIu
64
" find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
}
else
{
tsBufSetCursor
(
pRuntimeEnv
->
pTsBuf
,
&
pTableQueryInfo
->
cur
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:
%"
PRIu
64
" find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
GET_QID
(
pRuntimeEnv
),
pTag
->
i64
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
}
...
...
@@ -3596,7 +3596,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
int32_t
start
=
0
;
int32_t
step
=
-
1
;
qDebug
(
"QInfo:
%"
PRIu
64
" start to copy data from windowResInfo to output buf"
,
GET_QID
(
pRuntimeEnv
));
qDebug
(
"QInfo:
0x%"
PRIx
64
" start to copy data from windowResInfo to output buf"
,
GET_QID
(
pRuntimeEnv
));
if
(
orderType
==
TSDB_ORDER_ASC
)
{
start
=
pGroupResInfo
->
index
;
step
=
1
;
...
...
@@ -3636,7 +3636,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
}
}
qDebug
(
"QInfo:
%"
PRIu
64
" copy data to query buf completed"
,
GET_QID
(
pRuntimeEnv
));
qDebug
(
"QInfo:
0x%"
PRIx
64
" copy data to query buf completed"
,
GET_QID
(
pRuntimeEnv
));
pBlock
->
info
.
rows
=
numOfResult
;
return
0
;
}
...
...
@@ -3722,11 +3722,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
data
+=
sizeof
(
STableIdInfo
);
total
++
;
qDebug
(
"QInfo:
%"
PRIu
64
" set subscribe info, tid:%d, uid:%"
PRIu64
", skey:%"
PRId64
,
pQInfo
->
qId
,
item
->
tid
,
item
->
uid
,
item
->
key
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" set subscribe info, tid:%d, uid:%"
PRIu64
", skey:%"
PRId64
,
pQInfo
->
qId
,
item
->
tid
,
item
->
uid
,
item
->
key
);
item
=
taosHashIterate
(
pRuntimeEnv
->
pTableRetrieveTsMap
,
item
);
}
qDebug
(
"QInfo:
%"
PRIu
64
" set %d subscribe info"
,
pQInfo
->
qId
,
total
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" set %d subscribe info"
,
pQInfo
->
qId
,
total
);
// Check if query is completed or not for stable query or normal table query respectively.
if
(
Q_STATUS_EQUAL
(
pRuntimeEnv
->
status
,
QUERY_COMPLETED
)
&&
pRuntimeEnv
->
proot
->
status
==
OP_EXEC_DONE
)
{
setQueryStatus
(
pRuntimeEnv
,
QUERY_OVER
);
...
...
@@ -3765,12 +3765,12 @@ void queryCostStatis(SQInfo *pQInfo) {
pSummary
->
numOfTimeWindows
=
0
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
qDebug
(
"QInfo:
0x%"
PRIx
64
" :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pQInfo
->
qId
,
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
qDebug
(
"QInfo:
%"
PRIu
64
" :cost summary: winResPool size:%.2f Kb, numOfWin:%"
PRId64
", tableInfoSize:%.2f Kb, hashTable:%.2f Kb"
,
pQInfo
->
qId
,
pSummary
->
winInfoSize
/
1024
.
0
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" :cost summary: winResPool size:%.2f Kb, numOfWin:%"
PRId64
", tableInfoSize:%.2f Kb, hashTable:%.2f Kb"
,
pQInfo
->
qId
,
pSummary
->
winInfoSize
/
1024
.
0
,
pSummary
->
numOfTimeWindows
,
pSummary
->
tableInfoSize
/
1024
.
0
,
pSummary
->
hashSize
/
1024
.
0
);
}
...
...
@@ -3806,7 +3806,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
// qDebug("QInfo:
%"PRIu
64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// qDebug("QInfo:
0x%"PRIx
64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
//}
...
...
@@ -3836,7 +3836,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
// pTableQueryInfo->lastKey += step;
//
// qDebug("QInfo:
%"PRIu
64" skip rows:%d, offset:%" PRId64, GET_QID(pRuntimeEnv), blockInfo.rows,
// qDebug("QInfo:
0x%"PRIx
64" skip rows:%d, offset:%" PRId64, GET_QID(pRuntimeEnv), blockInfo.rows,
// pQuery->limit.offset);
// } else { // find the appropriated start position in current block
// updateOffsetVal(pRuntimeEnv, &blockInfo);
...
...
@@ -3884,7 +3884,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index
//
// qDebug("QInfo:
%"PRIu
64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
// qDebug("QInfo:
0x%"PRIx
64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64,
// GET_QID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
// pQuery->current->lastKey);
//
...
...
@@ -4315,7 +4315,7 @@ static SSDataBlock* doTableScan(void* param) {
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
pResult
[
0
]
->
win
.
skey
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_QID
(
pRuntimeEnv
),
cond
.
twindow
.
skey
,
cond
.
twindow
.
ekey
);
}
...
...
@@ -4325,7 +4325,7 @@ static SSDataBlock* doTableScan(void* param) {
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQuery
,
&
pQuery
->
window
);
tsdbResetQueryHandle
(
pTableScanInfo
->
pQueryHandle
,
&
cond
);
qDebug
(
"QInfo:
%"
PRIu
64
" start to reverse scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" start to reverse scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_QID
(
pRuntimeEnv
),
cond
.
twindow
.
skey
,
cond
.
twindow
.
ekey
);
pRuntimeEnv
->
scanFlag
=
REVERSE_SCAN
;
...
...
@@ -5584,14 +5584,14 @@ static SSDataBlock* doTagScan(void* param) {
count
+=
1
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" create (tableId, tag) info completed, rows:%d"
,
GET_QID
(
pRuntimeEnv
),
count
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" create (tableId, tag) info completed, rows:%d"
,
GET_QID
(
pRuntimeEnv
),
count
);
}
else
if
(
functionId
==
TSDB_FUNC_COUNT
)
{
// handle the "count(tbname)" query
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pRes
->
pDataBlock
,
0
);
*
(
int64_t
*
)
pColInfo
->
pData
=
pInfo
->
totalTables
;
count
=
1
;
pOperator
->
status
=
OP_EXEC_DONE
;
qDebug
(
"QInfo:
%"
PRIu
64
" create count(tbname) query, res:%d rows:1"
,
GET_QID
(
pRuntimeEnv
),
count
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" create count(tbname) query, res:%d rows:1"
,
GET_QID
(
pRuntimeEnv
),
count
);
}
else
{
// return only the tags|table name etc.
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
// todo use the column list instead of exprinfo
...
...
@@ -5630,7 +5630,7 @@ static SSDataBlock* doTagScan(void* param) {
pOperator
->
status
=
OP_EXEC_DONE
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" create tag values results completed, rows:%d"
,
GET_QID
(
pRuntimeEnv
),
count
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" create tag values results completed, rows:%d"
,
GET_QID
(
pRuntimeEnv
),
count
);
}
pRes
->
info
.
rows
=
count
;
...
...
@@ -6422,13 +6422,13 @@ static int32_t createFilterInfo(SQuery *pQuery, uint64_t qId) {
int32_t
lower
=
pSingleColFilter
->
filterInfo
.
lowerRelOptr
;
int32_t
upper
=
pSingleColFilter
->
filterInfo
.
upperRelOptr
;
if
(
lower
==
TSDB_RELATION_INVALID
&&
upper
==
TSDB_RELATION_INVALID
)
{
qError
(
"QInfo:
%"
PRIu
64
" invalid filter info"
,
qId
);
qError
(
"QInfo:
0x%"
PRIx
64
" invalid filter info"
,
qId
);
return
TSDB_CODE_QRY_INVALID_MSG
;
}
pSingleColFilter
->
fp
=
getFilterOperator
(
lower
,
upper
);
if
(
pSingleColFilter
->
fp
==
NULL
)
{
qError
(
"QInfo:
%"
PRIu
64
" invalid filter info"
,
qId
);
qError
(
"QInfo:
0x%"
PRIx
64
" invalid filter info"
,
qId
);
return
TSDB_CODE_QRY_INVALID_MSG
;
}
...
...
@@ -6659,7 +6659,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor
pQInfo
->
query
.
queryBlockDist
=
(
numOfOutput
==
1
&&
pExprs
[
0
].
base
.
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
);
qDebug
(
"qmsg:%p QInfo:
%"
PRIu
64
"-%p created"
,
pQueryMsg
,
pQInfo
->
qId
,
pQInfo
);
qDebug
(
"qmsg:%p QInfo:
0x%"
PRIx
64
"-%p created"
,
pQueryMsg
,
pQInfo
->
qId
,
pQInfo
);
return
pQInfo
;
_cleanup_qinfo:
...
...
@@ -6734,7 +6734,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
if
((
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
))
||
(
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
ekey
>
pQuery
->
window
.
skey
)))
{
qDebug
(
"QInfo:
%"
PRIu
64
" no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
->
qId
,
pQuery
->
window
.
skey
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
->
qId
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_COMPLETED
);
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
=
0
;
...
...
@@ -6743,7 +6743,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
}
if
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" no table qualified for tag filter, abort query"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" no table qualified for tag filter, abort query"
,
pQInfo
->
qId
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_COMPLETED
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -6824,7 +6824,7 @@ void freeQInfo(SQInfo *pQInfo) {
return
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" start to free QInfo"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" start to free QInfo"
,
pQInfo
->
qId
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
releaseQueryBuf
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
...
...
@@ -6875,7 +6875,7 @@ void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy
(
pRuntimeEnv
->
groupResInfo
.
pRows
);
pQInfo
->
signature
=
0
;
qDebug
(
"QInfo:
%"
PRIu
64
" QInfo is freed"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" QInfo is freed"
,
pQInfo
->
qId
);
tfree
(
pQInfo
);
}
...
...
@@ -6895,7 +6895,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
off_t
s
=
lseek
(
fileno
(
f
),
0
,
SEEK_END
);
assert
(
s
==
pRuntimeEnv
->
outputBuf
->
info
.
rows
);
qDebug
(
"QInfo:
%"
PRIu
64
" ts comp data return, file:%p, size:%"
PRId64
,
pQInfo
->
qId
,
f
,
(
uint64_t
)
s
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" ts comp data return, file:%p, size:%"
PRId64
,
pQInfo
->
qId
,
f
,
(
uint64_t
)
s
);
if
(
fseek
(
f
,
0
,
SEEK_SET
)
>=
0
)
{
size_t
sz
=
fread
(
data
,
1
,
s
,
f
);
if
(
sz
<
s
)
{
// todo handle error
...
...
@@ -6927,11 +6927,11 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
}
pRuntimeEnv
->
resultInfo
.
total
+=
pRuntimeEnv
->
outputBuf
->
info
.
rows
;
qDebug
(
"QInfo:
%"
PRIu
64
" current numOfRes rows:%d, total:%"
PRId64
,
pQInfo
->
qId
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" current numOfRes rows:%d, total:%"
PRId64
,
pQInfo
->
qId
,
pRuntimeEnv
->
outputBuf
->
info
.
rows
,
pRuntimeEnv
->
resultInfo
.
total
);
if
(
pQuery
->
limit
.
limit
>
0
&&
pQuery
->
limit
.
limit
==
pRuntimeEnv
->
resultInfo
.
total
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" results limitation reached, limitation:%"
PRId64
,
pQInfo
->
qId
,
pQuery
->
limit
.
limit
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" results limitation reached, limitation:%"
PRId64
,
pQInfo
->
qId
,
pQuery
->
limit
.
limit
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_OVER
);
}
...
...
src/query/src/qResultbuf.c
浏览文件 @
e8bc9be6
...
...
@@ -43,7 +43,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
pResBuf
->
emptyDummyIdList
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
qDebug
(
"QInfo:
%"
PRIu
64
" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s"
,
qId
,
pResBuf
->
pageSize
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s"
,
qId
,
pResBuf
->
pageSize
,
pResBuf
->
inMemPages
,
pResBuf
->
path
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -410,13 +410,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
}
if
(
pResultBuf
->
file
!=
NULL
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb"
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb"
,
pResultBuf
->
qId
,
pResultBuf
->
totalBufSize
/
1024
.
0
,
listNEles
(
pResultBuf
->
lruList
)
*
pResultBuf
->
pageSize
/
1024
.
0
,
pResultBuf
->
fileSize
/
1024
.
0
);
fclose
(
pResultBuf
->
file
);
}
else
{
qDebug
(
"QInfo:
%"
PRIu
64
" res output buffer closed, total:%.2f Kb, no file created"
,
pResultBuf
->
qId
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" res output buffer closed, total:%.2f Kb, no file created"
,
pResultBuf
->
qId
,
pResultBuf
->
totalBufSize
/
1024
.
0
);
}
...
...
src/query/src/queryMain.c
浏览文件 @
e8bc9be6
...
...
@@ -205,7 +205,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pQInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QInfo:
%"
PRIu
64
"-%p qhandle is now executed by thread:%p"
,
pQInfo
->
qId
,
pQInfo
,
(
void
*
)
curOwner
);
qError
(
"QInfo:
0x%"
PRIx
64
"-%p qhandle is now executed by thread:%p"
,
pQInfo
->
qId
,
pQInfo
,
(
void
*
)
curOwner
);
pQInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
false
;
}
...
...
@@ -215,13 +215,13 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
pQInfo
->
startExecTs
=
taosGetTimestampSec
();
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:
%"
PRIu
64
" it is already killed, abort"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" it is already killed, abort"
,
pQInfo
->
qId
);
return
doBuildResCheck
(
pQInfo
);
}
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" no table exists for query, abort"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" no table exists for query, abort"
,
pQInfo
->
qId
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_COMPLETED
);
return
doBuildResCheck
(
pQInfo
);
}
...
...
@@ -230,21 +230,21 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
int32_t
ret
=
setjmp
(
pQInfo
->
runtimeEnv
.
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:
%"
PRIu
64
" query abort due to error/cancel occurs, code:%s"
,
pQInfo
->
qId
,
tstrerror
(
pQInfo
->
code
));
qDebug
(
"QInfo:
0x%"
PRIx
64
" query abort due to error/cancel occurs, code:%s"
,
pQInfo
->
qId
,
tstrerror
(
pQInfo
->
code
));
return
doBuildResCheck
(
pQInfo
);
}
qDebug
(
"QInfo:
%"
PRIu
64
" query task is launched"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" query task is launched"
,
pQInfo
->
qId
);
pRuntimeEnv
->
outputBuf
=
pRuntimeEnv
->
proot
->
exec
(
pRuntimeEnv
->
proot
);
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:
%"
PRIu
64
" query is killed"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" query is killed"
,
pQInfo
->
qId
);
}
else
if
(
GET_NUM_OF_RESULTS
(
pRuntimeEnv
)
==
0
)
{
qDebug
(
"QInfo:
%"
PRIu
64
" over, %u tables queried, %"
PRId64
" rows are returned"
,
pQInfo
->
qId
,
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" over, %u tables queried, %"
PRId64
" rows are returned"
,
pQInfo
->
qId
,
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
pRuntimeEnv
->
resultInfo
.
total
);
}
else
{
qDebug
(
"QInfo:
%"
PRIu
64
" query paused, %d rows returned, numOfTotal:%"
PRId64
" rows"
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" query paused, %d rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
->
qId
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
pRuntimeEnv
->
resultInfo
.
total
+
GET_NUM_OF_RESULTS
(
pRuntimeEnv
));
}
...
...
@@ -255,13 +255,13 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
qError
(
"QInfo:
%"
PRIu
64
" invalid qhandle"
,
pQInfo
->
qId
);
qError
(
"QInfo:
0x%"
PRIx
64
" invalid qhandle"
,
pQInfo
->
qId
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QInfo:
%"
PRIu
64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
...
...
@@ -281,11 +281,11 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QInfo:
%"
PRIu
64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQuery
->
resultRowSize
,
qDebug
(
"QInfo:
0x%"
PRIx
64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQuery
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QInfo:
%"
PRIu
64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
...
...
@@ -344,10 +344,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
*
continueExec
=
false
;
(
*
pRsp
)
->
completed
=
1
;
// notify no more result to client
qDebug
(
"QInfo:
%"
PRIu
64
" no more results to retrieve"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" no more results to retrieve"
,
pQInfo
->
qId
);
}
else
{
*
continueExec
=
true
;
qDebug
(
"QInfo:
%"
PRIu
64
" has more results to retrieve"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" has more results to retrieve"
,
pQInfo
->
qId
);
}
// the memory should be freed if the code of pQInfo is not TSDB_CODE_SUCCESS
...
...
@@ -373,7 +373,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" query killed"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" query killed"
,
pQInfo
->
qId
);
setQueryKilled
(
pQInfo
);
// Wait for the query executing thread being stopped/
...
...
@@ -401,7 +401,7 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
return
;
}
qDebug
(
"QInfo:
%"
PRIu
64
" query completed"
,
pQInfo
->
qId
);
qDebug
(
"QInfo:
0x%"
PRIx
64
" query completed"
,
pQInfo
->
qId
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
freeQInfo
(
pQInfo
);
}
...
...
@@ -484,7 +484,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo) {
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:
%"
PRIu
64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QInfo:
0x%"
PRIx
64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -492,7 +492,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo) {
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QInfo:
%"
PRIu
64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QInfo:
0x%"
PRIx
64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
e8bc9be6
...
...
@@ -286,7 +286,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
}
taosArrayPush
(
pTableCheckInfo
,
&
info
);
tsdbDebug
(
"%p check table uid:%"
PRId64
", tid:%d from lastKey:%"
PRId64
"
%"
PRIu
64
,
pQueryHandle
,
info
.
tableId
.
uid
,
tsdbDebug
(
"%p check table uid:%"
PRId64
", tid:%d from lastKey:%"
PRId64
"
0x%"
PRIx
64
,
pQueryHandle
,
info
.
tableId
.
uid
,
info
.
tableId
.
tid
,
info
.
lastKey
,
pQueryHandle
->
qId
);
}
}
...
...
@@ -440,7 +440,7 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
tsdbMayTakeMemSnapshot
(
pQueryHandle
,
psTable
);
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in query,
%"
PRIu
64
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
),
pQueryHandle
->
qId
);
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in query,
0x%"
PRIx
64
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
),
pQueryHandle
->
qId
);
return
(
TsdbQueryHandleT
)
pQueryHandle
;
}
...
...
@@ -651,7 +651,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow
row
=
(
SDataRow
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
dataRowKey
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d check data in mem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
",
%"
PRIu
64
,
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
",
0x%"
PRIx
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
key
,
order
,
pMem
->
keyFirst
,
pMem
->
keyLast
,
pCheckInfo
->
lastKey
,
pMem
->
numOfRows
,
pHandle
->
qId
);
...
...
@@ -662,7 +662,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d no data in mem,
%"
PRIu
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d no data in mem,
0x%"
PRIx
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
pHandle
->
qId
);
}
...
...
@@ -673,7 +673,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow
row
=
(
SDataRow
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
dataRowKey
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d check data in imem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
",
%"
PRIu
64
,
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
",
0x%"
PRIx
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
key
,
order
,
pIMem
->
keyFirst
,
pIMem
->
keyLast
,
pCheckInfo
->
lastKey
,
pIMem
->
numOfRows
,
pHandle
->
qId
);
...
...
@@ -683,7 +683,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
assert
(
pCheckInfo
->
lastKey
>=
key
);
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d no data in imem,
%"
PRIu
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d no data in imem,
0x%"
PRIx
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
pHandle
->
qId
);
}
...
...
@@ -811,7 +811,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
}
pCheckInfo
->
lastKey
=
dataRowKey
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d check data in buffer from skey:%"
PRId64
", order:%d,
%"
PRIu
64
,
pHandle
,
tsdbDebug
(
"%p uid:%"
PRId64
", tid:%d check data in buffer from skey:%"
PRId64
", order:%d,
0x%"
PRIx
64
,
pHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
pCheckInfo
->
lastKey
,
pHandle
->
order
,
pHandle
->
qId
);
// all data in mem are checked already.
...
...
@@ -986,21 +986,21 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc
STSchema
*
pSchema
=
tsdbGetTableSchema
(
pCheckInfo
->
pTableObj
);
int32_t
code
=
tdInitDataCols
(
pQueryHandle
->
pDataCols
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols,
%"
PRIu
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
tsdbError
(
"%p failed to malloc buf for pDataCols,
0x%"
PRIx
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_error
;
}
code
=
tdInitDataCols
(
pQueryHandle
->
rhelper
.
pDCols
[
0
],
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for rhelper.pDataCols[0],
%"
PRIu
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
tsdbError
(
"%p failed to malloc buf for rhelper.pDataCols[0],
0x%"
PRIx
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_error
;
}
code
=
tdInitDataCols
(
pQueryHandle
->
rhelper
.
pDCols
[
1
],
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for rhelper.pDataCols[1],
%"
PRIu
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
tsdbError
(
"%p failed to malloc buf for rhelper.pDataCols[1],
0x%"
PRIx
64
,
pQueryHandle
,
pQueryHandle
->
qId
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_error
;
}
...
...
@@ -1036,14 +1036,14 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc
int64_t
elapsedTime
=
(
taosGetTimestampUs
()
-
st
);
pQueryHandle
->
cost
.
blockLoadTime
+=
elapsedTime
;
tsdbDebug
(
"%p load file block into buffer, index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d, elapsed time:%"
PRId64
" us,
%"
PRIu
64
,
tsdbDebug
(
"%p load file block into buffer, index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d, elapsed time:%"
PRId64
" us,
0x%"
PRIx
64
,
pQueryHandle
,
slotIndex
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
,
pBlock
->
numOfRows
,
elapsedTime
,
pQueryHandle
->
qId
);
return
TSDB_CODE_SUCCESS
;
_error:
pBlock
->
numOfRows
=
0
;
tsdbError
(
"%p error occurs in loading file block, index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d,
%"
PRIu
64
,
tsdbError
(
"%p error occurs in loading file block, index:%d, brange:%"
PRId64
"-%"
PRId64
", rows:%d,
0x%"
PRIx
64
,
pQueryHandle
,
slotIndex
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
,
pBlock
->
numOfRows
,
pQueryHandle
->
qId
);
return
terrno
;
}
...
...
@@ -1066,7 +1066,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
assert
(
cur
->
pos
>=
0
&&
cur
->
pos
<=
binfo
.
rows
);
TSKEY
key
=
(
row
!=
NULL
)
?
dataRowKey
(
row
)
:
TSKEY_INITIAL_VAL
;
tsdbDebug
(
"%p key in mem:%"
PRId64
",
%"
PRIu
64
,
pQueryHandle
,
key
,
pQueryHandle
->
qId
);
tsdbDebug
(
"%p key in mem:%"
PRId64
",
0x%"
PRIx
64
,
pQueryHandle
,
key
,
pQueryHandle
->
qId
);
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
binfo
.
window
.
ekey
))
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
binfo
.
window
.
skey
)))
{
...
...
@@ -1551,7 +1551,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
updateInfoAfterMerge
(
pQueryHandle
,
pCheckInfo
,
numOfRows
,
pos
);
doCheckGeneratedBlockRange
(
pQueryHandle
);
tsdbDebug
(
"%p uid:%"
PRIu64
",tid:%d data block created, mixblock:%d, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d,
%"
PRIu
64
,
tsdbDebug
(
"%p uid:%"
PRIu64
",tid:%d data block created, mixblock:%d, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d,
0x%"
PRIx
64
,
pQueryHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
cur
->
mixBlock
,
cur
->
win
.
skey
,
cur
->
win
.
ekey
,
cur
->
rows
,
pQueryHandle
->
qId
);
}
...
...
@@ -1605,7 +1605,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t
endPos
=
getEndPosInDataBlock
(
pQueryHandle
,
&
blockInfo
);
tsdbDebug
(
"%p uid:%"
PRIu64
",tid:%d start merge data block, file block range:%"
PRIu64
"-%"
PRIu64
" rows:%d, start:%d,"
"end:%d,
%"
PRIu
64
,
"end:%d,
0x%"
PRIx
64
,
pQueryHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
cur
->
pos
,
endPos
,
pQueryHandle
->
qId
);
...
...
@@ -1747,7 +1747,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
updateInfoAfterMerge
(
pQueryHandle
,
pCheckInfo
,
numOfRows
,
pos
);
doCheckGeneratedBlockRange
(
pQueryHandle
);
tsdbDebug
(
"%p uid:%"
PRIu64
",tid:%d data block created, mixblock:%d, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d,
%"
PRIu
64
,
tsdbDebug
(
"%p uid:%"
PRIu64
",tid:%d data block created, mixblock:%d, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d,
0x%"
PRIx
64
,
pQueryHandle
,
pCheckInfo
->
tableId
.
uid
,
pCheckInfo
->
tableId
.
tid
,
cur
->
mixBlock
,
cur
->
win
.
skey
,
cur
->
win
.
ekey
,
cur
->
rows
,
pQueryHandle
->
qId
);
}
...
...
@@ -1923,12 +1923,12 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
memcpy
(
pQueryHandle
->
pDataBlockInfo
,
sup
.
pDataBlockInfo
[
0
],
sizeof
(
STableBlockInfo
)
*
numOfBlocks
);
cleanBlockOrderSupporter
(
&
sup
,
numOfQualTables
);
tsdbDebug
(
"%p create data blocks info struct completed for 1 table, %d blocks not sorted
%"
PRIu
64
,
pQueryHandle
,
cnt
,
tsdbDebug
(
"%p create data blocks info struct completed for 1 table, %d blocks not sorted
0x%"
PRIx
64
,
pQueryHandle
,
cnt
,
pQueryHandle
->
qId
);
return
TSDB_CODE_SUCCESS
;
}
tsdbDebug
(
"%p create data blocks info struct completed, %d blocks in %d tables
%"
PRIu
64
,
pQueryHandle
,
cnt
,
tsdbDebug
(
"%p create data blocks info struct completed, %d blocks in %d tables
0x%"
PRIx
64
,
pQueryHandle
,
cnt
,
numOfQualTables
,
pQueryHandle
->
qId
);
assert
(
cnt
<=
numOfBlocks
&&
numOfQualTables
<=
numOfTables
);
// the pTableQueryInfo[j]->numOfBlocks may be 0
...
...
@@ -1965,7 +1965,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
* }
*/
tsdbDebug
(
"%p %d data blocks sort completed,
%"
PRIu
64
,
pQueryHandle
,
cnt
,
pQueryHandle
->
qId
);
tsdbDebug
(
"%p %d data blocks sort completed,
0x%"
PRIx
64
,
pQueryHandle
,
cnt
,
pQueryHandle
->
qId
);
cleanBlockOrderSupporter
(
&
sup
,
numOfTables
);
free
(
pTree
);
...
...
@@ -2023,7 +2023,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
win
.
skey
>
pQueryHandle
->
window
.
ekey
)
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
win
.
ekey
<
pQueryHandle
->
window
.
ekey
))
{
tsdbUnLockFS
(
REPO_FS
(
pQueryHandle
->
pTsdb
));
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore,
%"
PRIu
64
,
pQueryHandle
,
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore,
0x%"
PRIx
64
,
pQueryHandle
,
pQueryHandle
->
window
.
skey
,
pQueryHandle
->
window
.
ekey
,
pQueryHandle
->
qId
);
pQueryHandle
->
pFileGroup
=
NULL
;
assert
(
pQueryHandle
->
numOfBlocks
==
0
);
...
...
@@ -2047,7 +2047,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
break
;
}
tsdbDebug
(
"%p %d blocks found in file for %d table(s), fid:%d,
%"
PRIu
64
,
pQueryHandle
,
numOfBlocks
,
numOfTables
,
tsdbDebug
(
"%p %d blocks found in file for %d table(s), fid:%d,
0x%"
PRIx
64
,
pQueryHandle
,
numOfBlocks
,
numOfTables
,
pQueryHandle
->
pFileGroup
->
fid
,
pQueryHandle
->
qId
);
assert
(
numOfBlocks
>=
0
);
...
...
@@ -2139,7 +2139,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
win
.
skey
>
pQueryHandle
->
window
.
ekey
)
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
win
.
ekey
<
pQueryHandle
->
window
.
ekey
))
{
tsdbUnLockFS
(
REPO_FS
(
pQueryHandle
->
pTsdb
));
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore,
%"
PRIu
64
,
pQueryHandle
,
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore,
0x%"
PRIx
64
,
pQueryHandle
,
pQueryHandle
->
window
.
skey
,
pQueryHandle
->
window
.
ekey
,
pQueryHandle
->
qId
);
pQueryHandle
->
pFileGroup
=
NULL
;
break
;
...
...
@@ -2163,7 +2163,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
break
;
}
tsdbDebug
(
"%p %d blocks found in file for %d table(s), fid:%d,
%"
PRIu
64
,
pQueryHandle
,
numOfBlocks
,
numOfTables
,
tsdbDebug
(
"%p %d blocks found in file for %d table(s), fid:%d,
0x%"
PRIx
64
,
pQueryHandle
,
numOfBlocks
,
numOfTables
,
pQueryHandle
->
pFileGroup
->
fid
,
pQueryHandle
->
qId
);
if
(
numOfBlocks
==
0
)
{
...
...
@@ -2211,7 +2211,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
if
((
!
cur
->
mixBlock
)
||
cur
->
blockCompleted
)
{
// all data blocks in current file has been checked already, try next file if exists
}
else
{
tsdbDebug
(
"%p continue in current data block, index:%d, pos:%d,
%"
PRIu
64
,
pQueryHandle
,
cur
->
slot
,
cur
->
pos
,
tsdbDebug
(
"%p continue in current data block, index:%d, pos:%d,
0x%"
PRIx
64
,
pQueryHandle
,
cur
->
slot
,
cur
->
pos
,
pQueryHandle
->
qId
);
int32_t
code
=
handleDataMergeIfNeeded
(
pQueryHandle
,
pBlockInfo
->
compBlock
,
pCheckInfo
);
*
exists
=
(
pQueryHandle
->
realNumOfRows
>
0
);
...
...
@@ -2340,7 +2340,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
int64_t
elapsedTime
=
taosGetTimestampUs
()
-
st
;
tsdbDebug
(
"%p build data block from cache completed, elapsed time:%"
PRId64
" us, numOfRows:%d, numOfCols:%d,
%"
PRIu
64
,
pQueryHandle
,
tsdbDebug
(
"%p build data block from cache completed, elapsed time:%"
PRId64
" us, numOfRows:%d, numOfCols:%d,
0x%"
PRIx
64
,
pQueryHandle
,
elapsedTime
,
numOfRows
,
numOfCols
,
pQueryHandle
->
qId
);
return
numOfRows
;
...
...
@@ -3396,7 +3396,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
pQueryHandle
->
next
=
doFreeColumnInfoData
(
pQueryHandle
->
next
);
SIOCostSummary
*
pCost
=
&
pQueryHandle
->
cost
;
tsdbDebug
(
"%p :io-cost summary: statis-info:%"
PRId64
" us, datablock:%"
PRId64
" us, check data:%"
PRId64
" us,
%"
PRIu
64
,
tsdbDebug
(
"%p :io-cost summary: statis-info:%"
PRId64
" us, datablock:%"
PRId64
" us, check data:%"
PRId64
" us,
0x%"
PRIx
64
,
pQueryHandle
,
pCost
->
statisInfoLoadTime
,
pCost
->
blockLoadTime
,
pCost
->
checkForNextTime
,
pQueryHandle
->
qId
);
tfree
(
pQueryHandle
);
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
e8bc9be6
...
...
@@ -183,7 +183,7 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, uint64_t qId, v
}
}
else
{
*
freeHandle
=
true
;
vTrace
(
"QInfo:
%"
PRIu
64
"-%p exec completed, free handle:%d"
,
qId
,
*
handle
,
*
freeHandle
);
vTrace
(
"QInfo:
0x%"
PRIx
64
"-%p exec completed, free handle:%d"
,
qId
,
*
handle
,
*
freeHandle
);
}
}
else
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
...
...
@@ -244,7 +244,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if
(
handle
==
NULL
)
{
// failed to register qhandle
pRsp
->
code
=
terrno
;
terrno
=
0
;
vError
(
"vgId:%d, QInfo:
%"
PRIu
64
"-%p register qhandle failed, return to app, code:%s"
,
pVnode
->
vgId
,
qId
,
(
void
*
)
pQInfo
,
vError
(
"vgId:%d, QInfo:
0x%"
PRIx
64
"-%p register qhandle failed, return to app, code:%s"
,
pVnode
->
vgId
,
qId
,
(
void
*
)
pQInfo
,
tstrerror
(
pRsp
->
code
));
qDestroyQueryInfo
(
pQInfo
);
// destroy it directly
return
pRsp
->
code
;
...
...
@@ -255,7 +255,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if
(
handle
!=
NULL
&&
vnodeNotifyCurrentQhandle
(
pRead
->
rpcHandle
,
qId
,
*
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:
%"
PRIu
64
"-%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
qId
,
*
handle
,
vError
(
"vgId:%d, QInfo:
0x%"
PRIx
64
"-%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
qId
,
*
handle
,
pRead
->
rpcHandle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
...
...
@@ -266,7 +266,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if
(
handle
!=
NULL
)
{
vTrace
(
"vgId:%d, QInfo:
%"
PRIu
64
"-%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
qId
,
*
handle
);
vTrace
(
"vgId:%d, QInfo:
0x%"
PRIx
64
"-%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
qId
,
*
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
pRead
->
rpcHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRsp
->
code
=
code
;
...
...
@@ -331,7 +331,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
pRetrieve
->
qId
=
htobe64
(
pRetrieve
->
qId
);
vTrace
(
"vgId:%d, qId:
%"
PRIu
64
", retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
pRetrieve
->
qId
,
vTrace
(
"vgId:%d, qId:
0x%"
PRIx
64
", retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
pRetrieve
->
qId
,
pRetrieve
->
free
,
pRead
->
rpcHandle
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
...
...
@@ -414,7 +414,7 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int
pMsg
->
header
.
vgId
=
htonl
(
vgId
);
pMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
vTrace
(
"QInfo:
%"
PRIu
64
"-%p register qhandle to connect:%p"
,
qId
,
qhandle
,
handle
);
vTrace
(
"QInfo:
0x%"
PRIx
64
"-%p register qhandle to connect:%p"
,
qId
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
pMsg
,
sizeof
(
SRetrieveTableMsg
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录