Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
f14e986e
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f14e986e
编写于
6月 01, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'haoyifan/vnode_read' into develop
上级
20e77861
f38f6917
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
96 addition
and
86 deletion
+96
-86
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+86
-79
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+10
-7
未找到文件。
src/query/src/qExecutor.c
浏览文件 @
f14e986e
...
...
@@ -117,7 +117,7 @@ static int32_t flushFromResultBuf(SQInfo *pQInfo);
bool
doFilterData
(
SQuery
*
pQuery
,
int32_t
elemPos
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
k
];
char
*
pElem
=
pFilterInfo
->
pData
+
pFilterInfo
->
info
.
bytes
*
elemPos
;
if
(
isNull
(
pElem
,
pFilterInfo
->
info
.
type
))
{
return
false
;
...
...
@@ -126,7 +126,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) {
bool
qualified
=
false
;
for
(
int32_t
j
=
0
;
j
<
pFilterInfo
->
numOfFilters
;
++
j
)
{
SColumnFilterElem
*
pFilterElem
=
&
pFilterInfo
->
pFilters
[
j
];
if
(
pFilterElem
->
fp
(
pFilterElem
,
pElem
,
pElem
))
{
qualified
=
true
;
break
;
...
...
@@ -765,7 +765,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
SArray
*
pDataBlock
)
{
char
*
dataBlock
=
NULL
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
col
].
base
.
functionId
;
...
...
@@ -778,18 +778,18 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
}
else
{
pCtx
->
startOffset
=
pQuery
->
pos
-
(
size
-
1
);
}
sas
->
offset
=
0
;
sas
->
colList
=
pQuery
->
colList
;
sas
->
numOfCols
=
pQuery
->
numOfCols
;
sas
->
data
=
calloc
(
pQuery
->
numOfCols
,
POINTER_BYTES
);
// here the pQuery->colList and sas->colList are identical
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
SColumnInfo
*
pColMsg
=
&
pQuery
->
colList
[
i
];
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
dataBlock
=
NULL
;
for
(
int32_t
k
=
0
;
k
<
numOfCols
;
++
k
)
{
//todo refactor
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
k
);
...
...
@@ -798,7 +798,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
break
;
}
}
assert
(
dataBlock
!=
NULL
);
sas
->
data
[
i
]
=
dataBlock
/* + pQuery->colList[i].bytes*/
;
// start from the offset
}
...
...
@@ -838,7 +838,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
SArithmeticSupport
*
sasArray
=
calloc
((
size_t
)
pQuery
->
numOfOutput
,
sizeof
(
SArithmeticSupport
));
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
sasArray
[
k
],
k
);
...
...
@@ -901,10 +901,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if
(
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
!=
TSDB_FUNC_ARITHM
)
{
continue
;
}
tfree
(
sasArray
[
i
].
data
);
}
tfree
(
sasArray
);
}
...
...
@@ -964,7 +964,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
* column in cache with the corresponding meter schema is reinforced.
*/
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
i
);
if
(
pColIndex
->
colId
==
p
->
info
.
colId
)
{
...
...
@@ -972,7 +972,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
}
}
}
return
NULL
;
}
...
...
@@ -1037,7 +1037,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static
void
rowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
SWindowResInfo
*
pWindowResInfo
,
SArray
*
pDataBlock
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
item
=
pQuery
->
current
;
...
...
@@ -1170,10 +1170,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if
(
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
!=
TSDB_FUNC_ARITHM
)
{
continue
;
}
tfree
(
sasArray
[
i
].
data
);
}
free
(
sasArray
);
}
...
...
@@ -1362,7 +1362,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
SColIndex
*
pIndex
=
&
pSqlFuncMsg
->
colInfo
;
int32_t
index
=
pSqlFuncMsg
->
colInfo
.
colIndex
;
if
(
TSDB_COL_IS_TAG
(
pIndex
->
flag
))
{
if
(
pIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// todo refactor
...
...
@@ -1443,7 +1443,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
qTrace
(
"QInfo:%p teardown runtime env"
,
pQInfo
);
cleanupTimeWindowInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pQuery
->
numOfOutput
);
...
...
@@ -1485,7 +1485,7 @@ static bool isQueryKilled(SQInfo *pQInfo) {
pQInfo->killed = 1;
return true;
}
return (pQInfo->killed == 1);
#endif
}
...
...
@@ -1591,7 +1591,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
return
false
;
}
}
return
true
;
}
...
...
@@ -1893,14 +1893,14 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
/*
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
...
...
@@ -1922,26 +1922,26 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
int32_t colIndex = pFilterInfo->info.colIndex;
// this column not valid in current data block
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
continue;
}
// not support pre-filter operation on binary/nchar data type
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) {
continue;
}
// all points in current column are NULL, no need to check its boundary value
if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) {
continue;
}
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
float minval = *(double *)(&pDataStatis[colIndex].min);
float maxval = *(double *)(&pDataStatis[colIndex].max);
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) {
return true;
...
...
@@ -1956,7 +1956,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
}
}
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
...
...
@@ -2392,7 +2392,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
numOfCols
=
pQuery
->
numOfOutput
;
printf
(
"super table query intermediate result, total:%d
\n
"
,
numOfRows
);
for
(
int32_t
j
=
0
;
j
<
numOfRows
;
++
j
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
...
@@ -3534,7 +3534,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SWindowResInfo
*
pWindowResInfo
=
&
pTableQueryInfo
->
windowResInfo
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
rows
-
1
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
pDataBlock
);
}
else
{
...
...
@@ -4110,11 +4110,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
if
(
pInfo
->
id
.
tid
==
blockInfo
.
tid
)
{
assert
(
pInfo
->
id
.
uid
==
blockInfo
.
uid
);
pTableQueryInfo
=
item
->
info
;
break
;
}
}
if
(
pTableQueryInfo
!=
NULL
)
{
break
;
}
...
...
@@ -4182,11 +4182,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
pRuntimeEnv
->
pQueryHandle
=
NULL
;
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
);
taosArrayDestroy
(
tx
);
taosArrayDestroy
(
g1
);
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
cur
.
vgroupIndex
==
-
1
)
{
int64_t
tag
=
pRuntimeEnv
->
pCtx
[
0
].
tag
.
i64Key
;
...
...
@@ -4859,22 +4859,22 @@ static void stableQueryImpl(SQInfo *pQInfo) {
static
int32_t
getColumnIndexInSource
(
SQueryTableMsg
*
pQueryMsg
,
SSqlFuncMsg
*
pExprMsg
,
SColumnInfo
*
pTagCols
)
{
int32_t
j
=
0
;
if
(
TSDB_COL_IS_TAG
(
pExprMsg
->
colInfo
.
flag
))
{
while
(
j
<
pQueryMsg
->
numOfTags
)
{
if
(
pExprMsg
->
colInfo
.
colId
==
pTagCols
[
j
].
colId
)
{
return
j
;
}
j
+=
1
;
}
}
else
{
while
(
j
<
pQueryMsg
->
numOfCols
)
{
if
(
pExprMsg
->
colInfo
.
colId
==
pQueryMsg
->
colList
[
j
].
colId
)
{
return
j
;
}
j
+=
1
;
}
}
...
...
@@ -4923,7 +4923,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
}
}
}
return
true
;
}
...
...
@@ -5065,10 +5065,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg
=
(
SSqlFuncMsg
*
)
pMsg
;
}
if
(
!
validateQuerySourceCols
(
pQueryMsg
,
*
pExpr
))
{
tfree
(
*
pExpr
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
...
...
@@ -5111,12 +5111,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
(
*
tagCols
)
=
calloc
(
1
,
sizeof
(
SColumnInfo
)
*
pQueryMsg
->
numOfTags
);
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfTags
;
++
i
)
{
SColumnInfo
*
pTagCol
=
(
SColumnInfo
*
)
pMsg
;
pTagCol
->
colId
=
htons
(
pTagCol
->
colId
);
pTagCol
->
bytes
=
htons
(
pTagCol
->
bytes
);
pTagCol
->
type
=
htons
(
pTagCol
->
type
);
pTagCol
->
numOfFilters
=
0
;
(
*
tagCols
)[
i
]
=
*
pTagCol
;
pMsg
+=
sizeof
(
SColumnInfo
);
}
...
...
@@ -5128,14 +5128,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
memcpy
(
*
tagCond
,
pMsg
,
pQueryMsg
->
tagCondLen
);
pMsg
+=
pQueryMsg
->
tagCondLen
;
}
if
(
*
pMsg
!=
0
)
{
size_t
len
=
strlen
(
pMsg
)
+
1
;
*
tbnameCond
=
malloc
(
len
);
strcpy
(
*
tbnameCond
,
pMsg
);
pMsg
+=
len
;
}
qTrace
(
"qmsg:%p query %d tables, qrange:%"
PRId64
"-%"
PRId64
", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%"
PRId64
", fillType:%d, comptsLen:%d, limit:%"
PRId64
", offset:%"
PRId64
,
pQueryMsg
,
pQueryMsg
->
numOfTables
,
pQueryMsg
->
window
.
skey
,
pQueryMsg
->
window
.
ekey
,
pQueryMsg
->
numOfGroupCols
,
...
...
@@ -5160,7 +5160,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
qError
(
"qmsg:%p failed to create arithmetic expression string from:%s"
,
pQueryMsg
,
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
);
return
TSDB_CODE_APP_ERROR
;
}
pArithExprInfo
->
pExpr
=
pExprNode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5225,7 +5225,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutput
;
++
i
)
{
pExprs
[
i
].
base
=
*
pExprMsg
[
i
];
int16_t
functId
=
pExprs
[
i
].
base
.
functionId
;
if
(
functId
==
TSDB_FUNC_TOP
||
functId
==
TSDB_FUNC_BOTTOM
)
{
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
base
,
pTagCols
);
assert
(
j
<
pQueryMsg
->
numOfCols
);
...
...
@@ -5265,7 +5265,7 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfGroupCols
;
++
i
)
{
taosArrayPush
(
pGroupbyExpr
->
columnInfo
,
&
pColIndex
[
i
]);
}
return
pGroupbyExpr
;
}
...
...
@@ -5288,7 +5288,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
memcpy
(
&
pFilterInfo
->
info
,
&
pQuery
->
colList
[
i
],
sizeof
(
SColumnInfoData
));
pFilterInfo
->
info
=
pQuery
->
colList
[
i
];
pFilterInfo
->
numOfFilters
=
pQuery
->
colList
[
i
].
numOfFilters
;
pFilterInfo
->
pFilters
=
calloc
(
pFilterInfo
->
numOfFilters
,
sizeof
(
SColumnFilterElem
));
...
...
@@ -5435,9 +5435,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery
->
colList
[
i
]
=
pQueryMsg
->
colList
[
i
];
pQuery
->
colList
[
i
].
filters
=
tscFilterInfoClone
(
pQueryMsg
->
colList
[
i
].
filters
,
pQuery
->
colList
[
i
].
numOfFilters
);
}
pQuery
->
tagColList
=
pTagCols
;
// calculate the result row size
for
(
int16_t
col
=
0
;
col
<
numOfOutput
;
++
col
)
{
assert
(
pExprs
[
col
].
bytes
>
0
);
...
...
@@ -5484,22 +5484,23 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
// to make sure third party won't overwrite this structure
pQInfo
->
signature
=
pQInfo
;
pQInfo
->
tableIdGroupInfo
=
*
groupInfo
;
size_t
numOfGroups
=
taosArrayGetSize
(
groupInfo
->
pGroupList
);
pQInfo
->
groupInfo
.
pGroupList
=
taosArrayInit
(
numOfGroups
,
POINTER_BYTES
);
pQInfo
->
groupInfo
.
numOfTables
=
groupInfo
->
numOfTables
;
int
tableIndex
=
0
;
STimeWindow
window
=
pQueryMsg
->
window
;
taosArraySort
(
pTableIdList
,
compareTableIdInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
pa
=
taosArrayGetP
(
groupInfo
->
pGroupList
,
i
);
size_t
s
=
taosArrayGetSize
(
pa
);
SArray
*
p1
=
taosArrayInit
(
s
,
sizeof
(
SGroupItem
));
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
STableId
id
=
*
(
STableId
*
)
taosArrayGet
(
pa
,
j
);
SGroupItem
item
=
{
.
id
=
id
};
...
...
@@ -5515,6 +5516,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
item
.
info
->
tableIndex
=
tableIndex
++
;
taosArrayPush
(
p1
,
&
item
);
}
taosArrayPush
(
pQInfo
->
groupInfo
.
pGroupList
,
&
p1
);
}
...
...
@@ -5658,7 +5660,7 @@ static void freeQInfo(SQInfo *pQInfo) {
int32_t
numOfGroups
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
size_t
num
=
taosArrayGetSize
(
p
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SGroupItem
*
item
=
taosArrayGet
(
p
,
j
);
...
...
@@ -5666,17 +5668,17 @@ static void freeQInfo(SQInfo *pQInfo) {
destroyTableQueryInfo
(
item
->
info
,
pQuery
->
numOfOutput
);
}
}
taosArrayDestroy
(
p
);
}
taosArrayDestroy
(
pQInfo
->
groupInfo
.
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
taosArrayGetP
(
pQInfo
->
tableIdGroupInfo
.
pGroupList
,
i
);
taosArrayDestroy
(
p
);
}
taosArrayDestroy
(
pQInfo
->
tableIdGroupInfo
.
pGroupList
);
taosArrayDestroy
(
pQInfo
->
arrTableIdInfo
);
...
...
@@ -5684,14 +5686,14 @@ static void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
tfree
(
pQuery
->
pGroupbyExpr
);
}
tfree
(
pQuery
->
tagColList
);
tfree
(
pQuery
->
pFilterInfo
);
tfree
(
pQuery
->
colList
);
tfree
(
pQuery
->
sdata
);
tfree
(
pQuery
);
qTrace
(
"QInfo:%p QInfo is freed"
,
pQInfo
);
// destroy signature, in order to avoid the query process pass the object safety check
...
...
@@ -5744,7 +5746,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
qError
(
"QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s"
,
pQInfo
,
pQuery
->
sdata
[
0
]
->
data
,
strerror
(
errno
));
}
// all data returned, set query over
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
setQueryStatus
(
pQuery
,
QUERY_OVER
);
...
...
@@ -5861,7 +5863,12 @@ _over:
tfree
(
tagCond
);
tfree
(
tbnameCond
);
taosArrayDestroy
(
pTableIdList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
*
pQInfo
);
*
pQInfo
=
NULL
;
}
// if failed to add ref for all meters in this query, abort current query
return
code
;
}
...
...
@@ -5885,7 +5892,7 @@ void qTableQuery(qinfo_t qinfo) {
}
qTrace
(
"QInfo:%p query task is launched"
,
pQInfo
);
if
(
onlyQueryTags
(
pQInfo
->
runtimeEnv
.
pQuery
))
{
buildTagQueryResult
(
pQInfo
);
// todo support the limit/offset
}
else
if
(
pQInfo
->
runtimeEnv
.
stableQuery
)
{
...
...
@@ -5893,7 +5900,7 @@ void qTableQuery(qinfo_t qinfo) {
}
else
{
tableQueryImpl
(
pQInfo
);
}
sem_post
(
&
pQInfo
->
dataReady
);
// vnodeDecRefCount(pQInfo);
}
...
...
@@ -5987,7 +5994,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
static
void
buildTagQueryResult
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
size_t
num
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
assert
(
num
==
0
||
num
==
1
);
if
(
num
==
0
)
{
...
...
@@ -5996,44 +6003,44 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SArray
*
pa
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
0
);
num
=
taosArrayGetSize
(
pa
);
assert
(
num
==
pQInfo
->
groupInfo
.
numOfTables
);
int16_t
type
,
bytes
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
0
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TID_TAG
)
{
// return the tags & table Id
assert
(
pQuery
->
numOfOutput
==
1
);
SExprInfo
*
pExprInfo
=
&
pQuery
->
pSelectExpr
[
0
];
int32_t
rsize
=
pExprInfo
->
bytes
;
char
*
data
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SGroupItem
*
item
=
taosArrayGet
(
pa
,
i
);
char
*
output
=
pQuery
->
sdata
[
0
]
->
data
+
i
*
rsize
;
varDataSetLen
(
output
,
rsize
-
VARSTR_HEADER_SIZE
);
output
=
varDataVal
(
output
);
*
(
int64_t
*
)
output
=
item
->
id
.
uid
;
// memory align problem, todo serialize
output
+=
sizeof
(
item
->
id
.
uid
);
*
(
int32_t
*
)
output
=
item
->
id
.
tid
;
output
+=
sizeof
(
item
->
id
.
tid
);
*
(
int32_t
*
)
output
=
pQInfo
->
vgId
;
output
+=
sizeof
(
pQInfo
->
vgId
);
tsdbGetTableTagVal
(
pQInfo
->
tsdb
,
&
item
->
id
,
pExprInfo
->
base
.
colInfo
.
colId
,
&
type
,
&
bytes
,
&
data
);
memcpy
(
output
,
data
,
bytes
);
}
qTrace
(
"QInfo:%p create (tableId, tag) info completed, rows:%d"
,
pQInfo
,
num
);
}
else
{
// return only the tags|table name etc.
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SExprInfo
*
pExprInfo
=
pQuery
->
pSelectExpr
;
SGroupItem
*
item
=
taosArrayGet
(
pa
,
i
);
char
*
data
=
NULL
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
// todo check the return value, refactor codes
...
...
@@ -6059,7 +6066,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
pQInfo
->
tableIndex
=
pQInfo
->
groupInfo
.
numOfTables
;
qTrace
(
"QInfo:%p create tag values results completed, rows:%d"
,
pQInfo
,
num
);
}
pQuery
->
rec
.
rows
=
num
;
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
f14e986e
...
...
@@ -39,8 +39,8 @@ void vnodeInitReadFp(void) {
int32_t
vnodeProcessRead
(
void
*
param
,
int
msgType
,
void
*
pCont
,
int32_t
contLen
,
SRspRet
*
ret
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
if
(
vnodeProcessReadMsgFp
[
msgType
]
==
NULL
)
return
TSDB_CODE_MSG_NOT_PROCESSED
;
if
(
vnodeProcessReadMsgFp
[
msgType
]
==
NULL
)
return
TSDB_CODE_MSG_NOT_PROCESSED
;
if
(
pVnode
->
status
==
TAOS_VN_STATUS_DELETING
||
pVnode
->
status
==
TAOS_VN_STATUS_CLOSING
)
return
TSDB_CODE_INVALID_VGROUP_ID
;
...
...
@@ -53,26 +53,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
code
=
TSDB_CODE_SUCCESS
;
qinfo_t
pQInfo
=
NULL
;
if
(
contLen
!=
0
)
{
pRet
->
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
pQInfo
));
pRsp
->
code
=
pRet
->
code
;
pRet
->
len
=
sizeof
(
SQueryTableRsp
);
pRet
->
rsp
=
pRsp
;
vTrace
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
pVnode
->
vgId
,
pQInfo
);
}
else
{
assert
(
pCont
!=
NULL
);
pQInfo
=
pCont
;
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
}
qTableQuery
(
pQInfo
);
// do execute query
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
);
// do execute query
}
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录