Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e6a02f74
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
e6a02f74
编写于
5月 20, 2022
作者:
H
Haojun Liao
提交者:
GitHub
5月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12758 from taosdata/feature/3_liaohj
fix(query): assign the group id in the ssdatablock when generating results.
上级
c91f83aa
ee470a94
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
70 addition
and
50 deletion
+70
-50
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-4
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+11
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+39
-29
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-17
未找到文件。
source/client/test/clientTests.cpp
浏览文件 @
e6a02f74
...
@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
...
@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_free_result(pRes);
taos_close(pConn);
taos_close(pConn);
}
}
#endif
TEST
(
testCase
,
projection_query_tables
)
{
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
...
@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
sprintf
(
sql
,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
...
@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
printf
(
"start to insert next table
\n
"
);
printf
(
"start to insert next table
\n
"
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
sprintf
(
sql
,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
...
@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
taos_close
(
pConn
);
taos_close
(
pConn
);
}
}
#endif
TEST
(
testCase
,
agg_query_tables
)
{
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
e6a02f74
...
@@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
...
@@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
);
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
);
int32_t
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlockFilterOut
(
STqReadHandle
*
pHandle
,
SHashObj
*
filterOutUids
);
bool
tqNextDataBlockFilterOut
(
STqReadHandle
*
pHandle
,
SHashObj
*
filterOutUids
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
e6a02f74
...
@@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
...
@@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
return
0
;
return
0
;
}
}
int
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
ASSERT
(
pHandle
->
tbIdHash
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashRemove
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
));
}
return
0
;
}
source/libs/executor/src/executor.c
浏览文件 @
e6a02f74
...
@@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
...
@@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
pTaskInfo
;
return
pTaskInfo
;
}
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamBlockScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
taosArrayPush
(
qa
,
id
);
}
metaReaderClear
(
&
mr
);
return
qa
;
}
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
...
@@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
...
@@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo
=
pInfo
->
pDownstream
[
0
];
pInfo
=
pInfo
->
pDownstream
[
0
];
}
}
int32_t
code
=
0
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
taosArrayPush
(
qa
,
id
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
}
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
taosArrayDestroy
(
qa
);
metaReaderClear
(
&
mr
);
}
else
{
// remove the table id in current list
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
int32_t
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
tableIdList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
qa
);
return
code
;
}
}
else
{
assert
(
0
);
}
}
return
TSDB_CODE_SUCCESS
;
return
code
;
}
}
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e6a02f74
...
@@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
...
@@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
pAggInfo
->
groupId
=
groupId
;
pAggInfo
->
groupId
=
groupId
;
}
}
/**
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
* For interval query of both super table and table, copy the data in ascending order, since the output results are
* ordered in SWindowResutl already. While handling the group by query for both table and super table,
* all group result are completed already.
*
* @param pQInfo
* @param result
*/
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
int32_t
start
=
pGroupResInfo
->
index
;
...
@@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
continue
;
continue
;
}
}
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pPos
->
groupId
;
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pPos
->
groupId
)
{
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
break
;
break
;
}
}
...
@@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
if
(
TAOS_FAILED
(
code
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
taskInfo
),
tstrerror
(
code
));
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
taskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
longjmp
(
taskInfo
->
env
,
code
);
}
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
// do nothing, todo refactor
...
@@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
}
}
}
}
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)
);
qDebug
(
"%s result generated, rows:%d, groupId:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pBlock
->
info
.
rows
,
pBlock
->
info
.
groupId
);
blockDataUpdateTsWindow
(
pBlock
);
blockDataUpdateTsWindow
(
pBlock
);
return
0
;
return
0
;
}
}
...
@@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
...
@@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
return
;
return
;
}
}
// clear the existed group id
pBlock
->
info
.
groupId
=
0
;
doCopyToSDataBlock
(
pTaskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
rowCellOffset
,
pCtx
,
numOfExprs
);
doCopyToSDataBlock
(
pTaskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
rowCellOffset
,
pCtx
,
numOfExprs
);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow
(
pBlock
);
}
}
static
void
updateNumOfRowsInResultRows
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SResultRowInfo
*
pResultRowInfo
,
static
void
updateNumOfRowsInResultRows
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SResultRowInfo
*
pResultRowInfo
,
...
@@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
...
@@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted
(
pOperator
);
doSetOperatorCompleted
(
pOperator
);
}
}
doSetOperatorCompleted
(
pOperator
);
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录