Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
0d2b5ca6
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0d2b5ca6
编写于
4月 17, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-98] fix bugs in super table group by query
上级
412c7ee3
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
249 addition
and
185 deletion
+249
-185
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+13
-10
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+9
-13
src/query/inc/queryExecutor.h
src/query/inc/queryExecutor.h
+6
-5
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+140
-121
src/tsdb/inc/tsdb.h
src/tsdb/inc/tsdb.h
+11
-9
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+17
-0
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+47
-27
src/util/inc/tutil.h
src/util/inc/tutil.h
+5
-0
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
0d2b5ca6
...
@@ -71,6 +71,7 @@ typedef struct STableMeta {
...
@@ -71,6 +71,7 @@ typedef struct STableMeta {
typedef
struct
STableMetaInfo
{
typedef
struct
STableMetaInfo
{
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquried by name
STableMeta
*
pTableMeta
;
// table meta, cached in client side and acquried by name
SVgroupsInfo
*
vgroupList
;
SVgroupsInfo
*
vgroupList
;
/*
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
* 2. keep the vnode index for multi-vnode insertion
...
...
src/client/src/tscServer.c
浏览文件 @
0d2b5ca6
...
@@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
tscTrace
(
"%p queried tables:%d, table id: %s"
,
pSql
,
1
,
pTableMetaInfo
->
name
);
tscTrace
(
"%p queried tables:%d, table id: %s"
,
pSql
,
1
,
pTableMetaInfo
->
name
);
}
else
{
// query super table
}
else
{
// query super table
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
if
(
pTableMetaInfo
->
vgroupI
ndex
<
0
)
{
if
(
i
ndex
<
0
)
{
tscError
(
"%p error v
nodeIdx:%d"
,
pSql
,
pTableMetaInfo
->
vgroupI
ndex
);
tscError
(
"%p error v
groupIndex:%d"
,
pSql
,
i
ndex
);
return
-
1
;
return
-
1
;
}
}
pSql
->
ipList
.
numOfIps
=
1
;
// todo fix me
SCMVgroupInfo
*
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
pSql
->
ipList
.
numOfIps
=
pVgroupInfo
->
numOfIps
;
// todo fix me
pSql
->
ipList
.
port
=
tsDnodeShellPort
;
pSql
->
ipList
.
port
=
tsDnodeShellPort
;
pSql
->
ipList
.
inUse
=
0
;
pSql
->
ipList
.
inUse
=
0
;
// todo extract method
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfIps
;
++
i
)
{
SCMVgroupInfo
*
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
pTableMetaInfo
->
vgroupIndex
]
;
pSql
->
ipList
.
ip
[
i
]
=
pVgroupInfo
->
ipAddr
[
i
].
ip
;
pSql
->
ipList
.
ip
[
0
]
=
pVgroupInfo
->
ipAddr
[
0
].
ip
;
}
#if 0
#if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
...
@@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
#endif
#endif
tscTrace
(
"%p query on super table, numOfVgroup:%d, vgroupIndex:%d"
,
pSql
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
,
tscTrace
(
"%p query on super table, numOfVgroup:%d, vgroupIndex:%d"
,
pSql
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
,
index
);
pTableMetaInfo
->
vgroupIndex
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
numOfTables
=
1
;
numOfTables
=
1
;
...
@@ -2133,10 +2135,11 @@ _error_clean:
...
@@ -2133,10 +2135,11 @@ _error_clean:
for
(
int32_t
i
=
0
;
i
<
pInfo
->
vgroupList
->
numOfVgroups
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
vgroupList
->
numOfVgroups
;
++
i
)
{
SCMVgroupInfo
*
pVgroups
=
&
pInfo
->
vgroupList
->
vgroups
[
i
];
SCMVgroupInfo
*
pVgroups
=
&
pInfo
->
vgroupList
->
vgroups
[
i
];
pVgroups
->
numOfIps
=
htonl
(
pVgroups
->
numOfIps
);
pVgroups
->
vgId
=
htonl
(
pVgroups
->
vgId
);
pVgroups
->
vgId
=
htonl
(
pVgroups
->
vgId
);
assert
(
pVgroups
->
numOfIps
>=
1
);
for
(
int32_t
j
=
0
;
j
<
tListLen
(
pVgroups
->
ipAddr
)
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pVgroups
->
numOfIps
;
++
j
)
{
pVgroups
->
ipAddr
[
j
].
ip
=
htonl
(
pVgroups
->
ipAddr
[
j
].
ip
);
pVgroups
->
ipAddr
[
j
].
ip
=
htonl
(
pVgroups
->
ipAddr
[
j
].
ip
);
pVgroups
->
ipAddr
[
j
].
port
=
htons
(
pVgroups
->
ipAddr
[
j
].
port
);
pVgroups
->
ipAddr
[
j
].
port
=
htons
(
pVgroups
->
ipAddr
[
j
].
port
);
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
0d2b5ca6
...
@@ -2129,26 +2129,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
...
@@ -2129,26 +2129,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if
(
pPrevSql
==
NULL
)
{
if
(
pPrevSql
==
NULL
)
{
STableMeta
*
pTableMeta
=
taosCacheAcquireByName
(
tscCacheHandle
,
name
);
STableMeta
*
pTableMeta
=
taosCacheAcquireByName
(
tscCacheHandle
,
name
);
// SSuperTableMeta* pMetricMeta = NULL;
// if (cmd == TSDB_SQL_SELECT) {
// pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
// }
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
numOfTags
,
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
numOfTags
,
pTableMetaInfo
->
tagColumnIndex
);
pTableMetaInfo
->
tagColumnIndex
);
}
else
{
// transfer the ownership of pTableMeta
/pMetricMeta
to the newly create sql object.
}
else
{
// transfer the ownership of pTableMeta to the newly create sql object.
//
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMetaInfo
*
pPrevInfo
=
tscGetTableMetaInfoFromCmd
(
&
pPrevSql
->
cmd
,
pPrevSql
->
cmd
.
clauseIndex
,
0
);
// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
STableMeta
*
pPrevTableMeta
=
taosCacheTransfer
(
tscCacheHandle
,
(
void
**
)
&
pPrevInfo
->
pTableMeta
);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
SVgroupsInfo
*
pVgroupsInfo
=
pPrevInfo
->
vgroupList
;
pPrevInfo
->
vgroupList
=
NULL
;
// pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta
, pTableMetaInfo->numOfTags,
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
numOfTags
,
//
pTableMetaInfo->tagColumnIndex);
pTableMetaInfo
->
tagColumnIndex
);
}
}
assert
(
pFinalInfo
->
pTableMeta
!=
NULL
&&
pNewQueryInfo
->
numOfTables
==
1
);
assert
(
pFinalInfo
->
pTableMeta
!=
NULL
&&
pNewQueryInfo
->
numOfTables
==
1
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
// assert(pFinalInfo->pMetricMeta
!= NULL);
assert
(
pFinalInfo
->
vgroupList
!=
NULL
);
}
}
if
(
cmd
==
TSDB_SQL_SELECT
)
{
if
(
cmd
==
TSDB_SQL_SELECT
)
{
...
...
src/query/inc/queryExecutor.h
浏览文件 @
0d2b5ca6
...
@@ -18,6 +18,7 @@
...
@@ -18,6 +18,7 @@
#include "os.h"
#include "os.h"
#include "hash.h"
#include "hash.h"
#include "tsdb.h"
#include "qinterpolation.h"
#include "qinterpolation.h"
#include "qresultBuf.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
#include "qsqlparser.h"
...
@@ -108,8 +109,8 @@ typedef struct STableQueryInfo {
...
@@ -108,8 +109,8 @@ typedef struct STableQueryInfo {
}
STableQueryInfo
;
}
STableQueryInfo
;
typedef
struct
STableDataInfo
{
typedef
struct
STableDataInfo
{
int32_t
numOfBlocks
;
//
int32_t numOfBlocks;
int32_t
start
;
// start block index
//
int32_t start; // start block index
int32_t
tableIndex
;
int32_t
tableIndex
;
int32_t
groupIdx
;
// group id in table list
int32_t
groupIdx
;
// group id in table list
STableQueryInfo
*
pTableQInfo
;
STableQueryInfo
*
pTableQInfo
;
...
@@ -171,7 +172,7 @@ typedef struct SQInfo {
...
@@ -171,7 +172,7 @@ typedef struct SQInfo {
int32_t
pointsInterpo
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
sem_t
dataReady
;
S
Array
*
pTableList
;
// table id list
S
TableGroupInfo
groupInfo
;
// table id list
void
*
tsdb
;
void
*
tsdb
;
SQueryRuntimeEnv
runtimeEnv
;
SQueryRuntimeEnv
runtimeEnv
;
...
@@ -187,7 +188,7 @@ typedef struct SQInfo {
...
@@ -187,7 +188,7 @@ typedef struct SQInfo {
*/
*/
int32_t
tableIndex
;
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
int32_t
numOfGroupResultPages
;
STableDataInfo
*
pTableDataInfo
;
//
STableDataInfo* pTableDataInfo;
TSKEY
*
tsList
;
TSKEY
*
tsList
;
}
SQInfo
;
}
SQInfo
;
...
...
src/query/src/queryExecutor.c
浏览文件 @
0d2b5ca6
...
@@ -12,6 +12,7 @@
...
@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <tsdbMain.h>
#include "os.h"
#include "os.h"
#include "hash.h"
#include "hash.h"
...
@@ -92,7 +93,7 @@ enum {
...
@@ -92,7 +93,7 @@ enum {
TS_JOIN_TAG_NOT_EQUALS
=
2
,
TS_JOIN_TAG_NOT_EQUALS
=
2
,
};
};
static
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
S
TableDataInfo
*
pTableDataInfo
,
int32_t
start
,
int32_t
end
);
static
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
S
Array
*
group
);
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
);
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
);
static
void
resetMergeResultBuf
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
SResultInfo
*
pResultInfo
);
static
void
resetMergeResultBuf
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
SResultInfo
*
pResultInfo
);
...
@@ -104,7 +105,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData,
...
@@ -104,7 +105,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData,
int32_t
functionId
,
SDataStatis
*
pStatis
,
bool
hasNull
,
void
*
param
,
int32_t
scanFlag
);
int32_t
functionId
,
SDataStatis
*
pStatis
,
bool
hasNull
,
void
*
param
,
int32_t
scanFlag
);
static
void
initCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
void
initCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
void
destroyMeterQueryInfo
(
STableQueryInfo
*
pTableQueryInfo
,
int32_t
numOfCols
);
static
void
destroyMeterQueryInfo
(
STableQueryInfo
*
pTableQueryInfo
,
int32_t
numOfCols
);
static
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
int32_t
meterIdx
,
STableQueryInfo
*
pTableQueryInfo
);
static
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
STable
*
pTable
,
STableQueryInfo
*
pTableQueryInfo
);
static
void
resetCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
void
resetCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
bool
hasMainOutput
(
SQuery
*
pQuery
);
static
bool
hasMainOutput
(
SQuery
*
pQuery
);
...
@@ -2185,7 +2186,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
...
@@ -2185,7 +2186,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
num
=
128
;
num
=
128
;
}
else
if
(
isIntervalQuery
(
pQuery
))
{
// time window query, allocate one page for each table
}
else
if
(
isIntervalQuery
(
pQuery
))
{
// time window query, allocate one page for each table
size_t
s
=
taosArrayGetSize
(
pQInfo
->
pTableList
)
;
size_t
s
=
pQInfo
->
groupInfo
.
numOfTables
;
num
=
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
);
num
=
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
);
}
else
{
// for super table query, one page for each subset
}
else
{
// for super table query, one page for each subset
num
=
1
;
//pQInfo->pSidSet->numOfSubSet;
num
=
1
;
//pQInfo->pSidSet->numOfSubSet;
...
@@ -2253,7 +2254,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
...
@@ -2253,7 +2254,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
// get one queried meter
// get one queried meter
assert
(
0
);
assert
(
0
);
// SMeterObj *pMeter = getMeterObj(pQInfo->
pTableList
, pQInfo->pSidSet->pTableIdList[0]->sid);
// SMeterObj *pMeter = getMeterObj(pQInfo->
groupInfo
, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
...
@@ -2298,10 +2299,8 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
...
@@ -2298,10 +2299,8 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
.
colList
=
pQuery
->
colList
,
.
colList
=
pQuery
->
colList
,
};
};
SArray
*
sa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->
pTableList
, pQInfo->pSidSet->pTableIdList[i]->sid);
// SMeterObj *p1 = getMeterObj(pQInfo->
groupInfo
, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// taosArrayPush(sa, &p1);
// }
// }
...
@@ -2310,7 +2309,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
...
@@ -2310,7 +2309,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
NULL
,
&
cond
,
sa
,
cols
);
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
NULL
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
// metric query do not invoke interpolation, it will be done at the second-stage merge
// metric query do not invoke interpolation, it will be done at the second-stage merge
if
(
!
isPointInterpoQuery
(
pQuery
))
{
if
(
!
isPointInterpoQuery
(
pQuery
))
{
...
@@ -2331,18 +2330,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
...
@@ -2331,18 +2330,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
*/
*/
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
!=
NULL
)
{
if
(
pQInfo
!=
NULL
)
{
// assert(taosHashGetSize(pQInfo->
pTableList
) >= 1);
// assert(taosHashGetSize(pQInfo->
groupInfo
) >= 1);
}
}
#if 0
#if 0
if (pQInfo == NULL || pQInfo->numOfTables == 1) {
if (pQInfo == NULL || pQInfo->
groupInfo.
numOfTables == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else {
} else {
int32_t num = 0;
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfTables; ++i) {
for (int32_t i = 0; i < pQInfo->
groupInfo.
numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->
pTableList
, pQInfo->pSidSet->pTableIdList[i]->sid);
SMeterObj *pMeter = getMeterObj(pQInfo->
groupInfo
, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
if (pMeter->numOfQueries > 0) {
...
@@ -2356,9 +2355,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
...
@@ -2356,9 +2355,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
* we do not output corresponding information
*/
*/
num = pQInfo->numOfTables - num;
num = pQInfo->
groupInfo.
numOfTables - num;
dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->numOfTables, num);
pQInfo->
groupInfo.
numOfTables, num);
}
}
#endif
#endif
}
}
...
@@ -2663,32 +2662,27 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
...
@@ -2663,32 +2662,27 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
* e.g.,tag information into input buffer
*/
*/
static
void
doSetTagValueInParam
(
SColumnModel
*
pTagSchema
,
int32_t
tagColIdx
,
void
*
pMeterSidInfo
,
tVariant
*
param
)
{
static
void
doSetTagValueInParam
(
void
*
tsdb
,
STableId
id
,
int32_t
tagColIdx
,
tVariant
*
param
)
{
assert
(
tagColIdx
>=
0
);
assert
(
tagColIdx
>=
0
);
#if 0
int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx);
void * pStr = (char *)pMeterSidInfo->tags + offset;
SSchema *pCol = getColumnModelSchema(pTagSchema, tagColIdx);
tVariantDestroy
(
param
);
tVariantDestroy
(
param
);
if (isNull(pStr, pCol->type)) {
char
*
val
=
NULL
;
param->nType = TSDB_DATA_TYPE_NULL
;
int16_t
bytes
=
0
;
} else {
int16_t
type
=
0
;
tVariantCreateFromBinary(param, pStr, pCol->bytes, pCol->type);
}
tsdbGetTableTagVal
(
tsdb
,
id
,
tagColIdx
,
&
type
,
&
bytes
,
&
val
);
#endif
tVariantCreateFromBinary
(
param
,
val
,
bytes
,
type
);
}
}
void
vnodeSetTagValueInParam
(
STableGroup
List
*
pSidSet
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pMeterSidInfo
)
{
void
vnodeSetTagValueInParam
(
STableGroup
Info
*
groupList
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableId
id
,
void
*
tsdb
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SColumnModel
*
pTagSchema
=
NULL
;
//pSidSet->pColumnModel;
//
SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel;
SSqlFuncExprMsg
*
pFuncMsg
=
&
pQuery
->
pSelectExpr
[
0
].
pBase
;
SSqlFuncExprMsg
*
pFuncMsg
=
&
pQuery
->
pSelectExpr
[
0
].
pBase
;
if
(
pQuery
->
numOfOutputCols
==
1
&&
pFuncMsg
->
functionId
==
TSDB_FUNC_TS_COMP
)
{
if
(
pQuery
->
numOfOutputCols
==
1
&&
pFuncMsg
->
functionId
==
TSDB_FUNC_TS_COMP
)
{
assert
(
pFuncMsg
->
numOfParams
==
1
);
assert
(
pFuncMsg
->
numOfParams
==
1
);
doSetTagValueInParam
(
pTagSchema
,
pFuncMsg
->
arg
->
argValue
.
i64
,
pMeterSidInfo
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
doSetTagValueInParam
(
tsdb
,
id
,
pFuncMsg
->
arg
->
argValue
.
i64
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
}
else
{
}
else
{
// set tag value, by which the results are aggregated.
// set tag value, by which the results are aggregated.
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutputCols
;
++
idx
)
{
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutputCols
;
++
idx
)
{
...
@@ -2699,14 +2693,14 @@ void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntim
...
@@ -2699,14 +2693,14 @@ void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntim
continue
;
continue
;
}
}
doSetTagValueInParam
(
pTagSchema
,
pColEx
->
colIndex
,
pMeterSidInfo
,
&
pRuntimeEnv
->
pCtx
[
idx
].
tag
);
doSetTagValueInParam
(
tsdb
,
id
,
pColEx
->
colIndex
,
&
pRuntimeEnv
->
pCtx
[
idx
].
tag
);
}
}
// set the join tag for first column
// set the join tag for first column
if
(
pFuncMsg
->
functionId
==
TSDB_FUNC_TS
&&
pFuncMsg
->
colInfo
.
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
&&
if
(
pFuncMsg
->
functionId
==
TSDB_FUNC_TS
&&
pFuncMsg
->
colInfo
.
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
&&
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
assert
(
pFuncMsg
->
numOfParams
==
1
);
assert
(
pFuncMsg
->
numOfParams
==
1
);
doSetTagValueInParam
(
pTagSchema
,
pFuncMsg
->
arg
->
argValue
.
i64
,
pMeterSidInfo
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
//
doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
}
}
}
}
}
}
...
@@ -2915,10 +2909,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
...
@@ -2915,10 +2909,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t
ret
=
TSDB_CODE_SUCCESS
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
int32_t
start
=
0
;
//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
//
int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
int32_t
end
=
taosArrayGetSize
(
pQInfo
->
pTableList
)
-
1
;
//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
// int32_t end = pQInfo->groupInfo.numOfTables
- 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
ret
=
mergeIntoGroupResultImpl
(
pQInfo
,
pQInfo
->
pTableDataInfo
,
start
,
end
);
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
0
);
ret
=
mergeIntoGroupResultImpl
(
pQInfo
,
group
);
if
(
ret
<
0
)
{
// not enough disk space to save the data into disk
if
(
ret
<
0
)
{
// not enough disk space to save the data into disk
return
-
1
;
return
-
1
;
}
}
...
@@ -3015,22 +3010,26 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
...
@@ -3015,22 +3010,26 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
return
maxOutput
;
return
maxOutput
;
}
}
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
S
TableDataInfo
*
pTableDataInfo
,
int32_t
start
,
int32_t
end
)
{
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
S
Array
*
pGroup
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tFilePage
**
buffer
=
(
tFilePage
**
)
pQuery
->
sdata
;
size_t
size
=
taosArrayGetSize
(
pGroup
);
int32_t
*
posList
=
calloc
((
end
-
start
),
sizeof
(
int32_t
));
STableDataInfo
**
pTableList
=
malloc
(
POINTER_BYTES
*
(
end
-
start
));
tFilePage
**
buffer
=
(
tFilePage
**
)
pQuery
->
sdata
;
int32_t
*
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
STableDataInfo
**
pTableList
=
malloc
(
POINTER_BYTES
*
size
);
// todo opt for the case of one table per group
// todo opt for the case of one table per group
int32_t
numOfTables
=
0
;
int32_t
numOfTables
=
0
;
for
(
int32_t
i
=
start
;
i
<
end
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
tid
=
pTableDataInfo
[
i
].
pTableQInfo
->
tid
;
SPair
*
p
=
taosArrayGet
(
pGroup
,
i
);
STableQueryInfo
*
pInfo
=
p
->
sec
;
SIDList
list
=
getDataBufPagesIdList
(
pRuntimeEnv
->
pResultBuf
,
tid
);
SIDList
list
=
getDataBufPagesIdList
(
pRuntimeEnv
->
pResultBuf
,
pInfo
->
tid
);
if
(
list
.
size
>
0
&&
p
TableDataInfo
[
i
].
pTableQ
Info
->
windowResInfo
.
size
>
0
)
{
if
(
list
.
size
>
0
&&
pInfo
->
windowResInfo
.
size
>
0
)
{
pTableList
[
numOfTables
]
=
&
pTableDataInfo
[
i
];
//
pTableList[numOfTables] = &pTableDataInfo[i];
numOfTables
+=
1
;
numOfTables
+=
1
;
}
}
}
}
...
@@ -3258,14 +3257,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
...
@@ -3258,14 +3257,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
}
}
if
(
isIntervalQuery
(
pQuery
))
{
if
(
isIntervalQuery
(
pQuery
))
{
size_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
STableQueryInfo
*
pTableQueryInfo
=
pQInfo
->
pTableDataInfo
[
i
].
pTableQInfo
;
//
SWindowResInfo
*
pWindowResInfo
=
&
pTableQueryInfo
->
windowResInfo
;
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
// }
doDisableFunctsForSupplementaryScan
(
pQuery
,
pWindowResInfo
,
order
);
}
}
else
{
}
else
{
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
doDisableFunctsForSupplementaryScan
(
pQuery
,
pWindowResInfo
,
order
);
doDisableFunctsForSupplementaryScan
(
pQuery
,
pWindowResInfo
,
order
);
...
@@ -3557,7 +3554,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -3557,7 +3554,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
pQInfo
->
pTableList
,
cols
);
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
}
}
taosArrayDestroy
(
cols
);
taosArrayDestroy
(
cols
);
...
@@ -3654,7 +3651,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid
...
@@ -3654,7 +3651,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid
return
pTableQueryInfo
;
return
pTableQueryInfo
;
}
}
void
destroyMeterQueryInfo
(
STableQueryInfo
*
pTableQueryInfo
,
int32_t
numOfCols
)
{
UNUSED_FUNC
void
destroyMeterQueryInfo
(
STableQueryInfo
*
pTableQueryInfo
,
int32_t
numOfCols
)
{
if
(
pTableQueryInfo
==
NULL
)
{
if
(
pTableQueryInfo
==
NULL
)
{
return
;
return
;
}
}
...
@@ -3700,7 +3697,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
...
@@ -3700,7 +3697,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pRuntimeEnv
* @param pDataBlockInfo
* @param pDataBlockInfo
*/
*/
void
setExecutionContext
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableQueryInfo
,
int32_t
meterIdx
,
int32_t
groupIdx
,
void
setExecutionContext
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableQueryInfo
,
STable
*
pTable
,
int32_t
groupIdx
,
TSKEY
nextKey
)
{
TSKEY
nextKey
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
...
@@ -3726,7 +3723,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32
...
@@ -3726,7 +3723,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32
initCtxOutputBuf
(
pRuntimeEnv
);
initCtxOutputBuf
(
pRuntimeEnv
);
pTableQueryInfo
->
lastKey
=
nextKey
;
pTableQueryInfo
->
lastKey
=
nextKey
;
setAdditionalInfo
(
pQInfo
,
meterIdx
,
pTableQueryInfo
);
setAdditionalInfo
(
pQInfo
,
pTable
,
pTableQueryInfo
);
}
}
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
)
{
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
)
{
...
@@ -3754,11 +3751,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
...
@@ -3754,11 +3751,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
}
}
}
}
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
int32_t
meterIdx
,
STableQueryInfo
*
pTableQueryInfo
)
{
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
STable
*
pTable
,
STableQueryInfo
*
pTableQueryInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
assert
(
pTableQueryInfo
->
lastKey
>
0
);
assert
(
pTableQueryInfo
->
lastKey
>
0
);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]
);
vnodeSetTagValueInParam
(
&
pQInfo
->
groupInfo
,
pRuntimeEnv
,
pTable
->
tableId
,
pQInfo
->
tsdb
);
// both the master and supplement scan needs to set the correct ts comp start position
// both the master and supplement scan needs to set the correct ts comp start position
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
...
@@ -3866,7 +3863,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
...
@@ -3866,7 +3863,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
isIntervalQuery
(
pQuery
)))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
isIntervalQuery
(
pQuery
)))
{
totalSubset
=
numOfClosedTimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
totalSubset
=
numOfClosedTimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
}
else
{
}
else
{
totalSubset
=
1
;
//pQInfo->pSidSet->numOfSubSet
;
totalSubset
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
)
;
}
}
return
totalSubset
;
return
totalSubset
;
...
@@ -4193,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
...
@@ -4193,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
pQInfo
->
pTableList
,
cols
);
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
taosArrayDestroy
(
cols
);
taosArrayDestroy
(
cols
);
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
tsdb
=
tsdb
;
...
@@ -4294,7 +4291,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
...
@@ -4294,7 +4291,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
UNUSED_FUNC
bool
isGroupbyEachTable
(
SSqlGroupbyExpr
*
pGroupbyExpr
,
STableGroup
List
*
pSidset
)
{
static
UNUSED_FUNC
bool
isGroupbyEachTable
(
SSqlGroupbyExpr
*
pGroupbyExpr
,
STableGroup
Info
*
pSidset
)
{
if
(
pGroupbyExpr
==
NULL
||
pGroupbyExpr
->
numOfGroupCols
==
0
)
{
if
(
pGroupbyExpr
==
NULL
||
pGroupbyExpr
->
numOfGroupCols
==
0
)
{
return
false
;
return
false
;
}
}
...
@@ -4335,7 +4332,6 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
...
@@ -4335,7 +4332,6 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int64_t
st
=
taosGetTimestampMs
();
int64_t
st
=
taosGetTimestampMs
();
size_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
tsdb_query_handle_t
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
tsdb_query_handle_t
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
...
@@ -4345,14 +4341,25 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
...
@@ -4345,14 +4341,25 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
STableDataInfo
*
pTableDataInfo
=
NULL
;
STableDataInfo
*
pTableDataInfo
=
NULL
;
STable
*
pTable
=
NULL
;
// todo opt performance using hash table
size_t
numOfGroup
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroup
;
++
i
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
// todo opt performance
size_t
num
=
taosArrayGetSize
(
group
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
if
(
pQInfo
->
pTableDataInfo
[
i
].
pTableQInfo
->
tid
==
blockInfo
.
sid
)
{
SPair
*
p
=
taosArrayGet
(
group
,
j
);
pTableDataInfo
=
&
pQInfo
->
pTableDataInfo
[
i
];
STableDataInfo
*
pInfo
=
p
->
sec
;
if
(
pInfo
->
pTableQInfo
->
tid
==
blockInfo
.
sid
)
{
pTableDataInfo
=
p
->
sec
;
pTable
=
p
->
first
;
break
;
break
;
}
}
}
}
}
assert
(
pTableDataInfo
!=
NULL
&&
pTableDataInfo
->
pTableQInfo
!=
NULL
);
assert
(
pTableDataInfo
!=
NULL
&&
pTableDataInfo
->
pTableQInfo
!=
NULL
);
STableQueryInfo
*
pTableQueryInfo
=
pTableDataInfo
->
pTableQInfo
;
STableQueryInfo
*
pTableQueryInfo
=
pTableDataInfo
->
pTableQInfo
;
...
@@ -4364,10 +4371,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
...
@@ -4364,10 +4371,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
TSKEY
nextKey
=
blockInfo
.
window
.
ekey
;
TSKEY
nextKey
=
blockInfo
.
window
.
ekey
;
if
(
!
isIntervalQuery
(
pQuery
))
{
if
(
!
isIntervalQuery
(
pQuery
))
{
setExecutionContext
(
pQInfo
,
pTableQueryInfo
,
pTable
DataInfo
->
tableIndex
,
pTableDataInfo
->
groupIdx
,
nextKey
);
setExecutionContext
(
pQInfo
,
pTableQueryInfo
,
pTable
,
pTableDataInfo
->
groupIdx
,
nextKey
);
}
else
{
// interval query
}
else
{
// interval query
setIntervalQueryRange
(
pTableQueryInfo
,
pQInfo
,
nextKey
);
setIntervalQueryRange
(
pTableQueryInfo
,
pQInfo
,
nextKey
);
int32_t
ret
=
setAdditionalInfo
(
pQInfo
,
pTable
DataInfo
->
tableIndex
,
pTableQueryInfo
);
int32_t
ret
=
setAdditionalInfo
(
pQInfo
,
pTable
,
pTableQueryInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
pQInfo
->
code
=
ret
;
...
@@ -4493,7 +4500,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
...
@@ -4493,7 +4500,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
#if 0
#if 0
// STableGroup
List
*pTableIdList = pSupporter->pSidSet;
// STableGroup
Info
*pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
...
@@ -4747,35 +4754,40 @@ static void createTableDataInfo(SQInfo* pQInfo) {
...
@@ -4747,35 +4754,40 @@ static void createTableDataInfo(SQInfo* pQInfo) {
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
int32_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
// if (pQInfo->pTableDataInfo == NULL) {
// pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * pQInfo->groupInfo.numOfTables);
// if (pQInfo->pTableDataInfo == NULL) {
// dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
// pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
// return;
// }
if
(
pQInfo
->
pTableDataInfo
==
NULL
)
{
size_t
numOfGroups
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
pQInfo
->
pTableDataInfo
=
(
STableDataInfo
*
)
calloc
(
1
,
sizeof
(
STableDataInfo
)
*
numOfTables
);
int32_t
index
=
0
;
if
(
pQInfo
->
pTableDataInfo
==
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
// load all meter meta info
dError
(
"QInfo:%p failed to allocate memory, %s"
,
pQInfo
,
strerror
(
errno
));
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
pQInfo
->
code
=
-
TSDB_CODE_SERV_OUT_OF_MEMORY
;
return
;
}
int32_t
groupId
=
0
;
size_t
s
=
taosArrayGetSize
(
group
)
;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
// load all meter meta info
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
S
TableId
*
id
=
taosArrayGet
(
pQInfo
->
pTableList
,
i
);
S
Pair
*
p
=
(
SPair
*
)
taosArrayGet
(
group
,
j
);
STableDataInfo
*
pInfo
=
&
pQInfo
->
pTableDataInfo
[
i
]
;
STableDataInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableDataInfo
))
;
setTableDataInfo
(
pInfo
,
i
,
groupId
);
setTableDataInfo
(
pInfo
,
index
,
i
);
pInfo
->
pTableQInfo
=
createTableQueryInfo
(
&
pQInfo
->
runtimeEnv
,
id
->
tid
,
pQuery
->
window
);
pInfo
->
pTableQInfo
=
createTableQueryInfo
(
&
pQInfo
->
runtimeEnv
,
((
STable
*
)(
p
->
first
))
->
tableId
.
tid
,
pQuery
->
window
);
p
->
sec
=
pInfo
;
index
+=
1
;
}
}
}
}
}
}
static
void
prepareQueryInfoForReverseScan
(
SQInfo
*
pQInfo
)
{
static
void
prepareQueryInfoForReverseScan
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
// for (int32_t i = 0; i < pQInfo->groupInfo.
numOfTables; ++i) {
STableQueryInfo
*
pTableQueryInfo
=
pQInfo
->
pTableDataInfo
[
i
].
pTableQInfo
;
//
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
changeMeterQueryInfoForSuppleQuery
(
pQuery
,
pTableQueryInfo
);
//
changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
}
//
}
}
}
static
void
doSaveContext
(
SQInfo
*
pQInfo
)
{
static
void
doSaveContext
(
SQInfo
*
pQInfo
)
{
...
@@ -4809,12 +4821,23 @@ static void doRestoreContext(SQInfo* pQInfo) {
...
@@ -4809,12 +4821,23 @@ static void doRestoreContext(SQInfo* pQInfo) {
static
void
doCloseAllTimeWindowAfterScan
(
SQInfo
*
pQInfo
)
{
static
void
doCloseAllTimeWindowAfterScan
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
size_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
if
(
isIntervalQuery
(
pQuery
))
{
if
(
isIntervalQuery
(
pQuery
))
{
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
STableQueryInfo
*
pTableQueryInfo
=
pQInfo
->
pTableDataInfo
[
i
].
pTableQInfo
;
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
closeAllTimeWindow
(
&
pTableQueryInfo
->
windowResInfo
);
// closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
// }
size_t
numOfGroup
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroup
;
++
i
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
size_t
num
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SPair
*
p
=
taosArrayGet
(
group
,
j
);
STableDataInfo
*
pInfo
=
p
->
sec
;
closeAllTimeWindow
(
&
pInfo
->
pTableQInfo
->
windowResInfo
);
}
}
}
}
else
{
// close results for group result
}
else
{
// close results for group result
closeAllTimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
closeAllTimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
...
@@ -5140,15 +5163,15 @@ static void tableQueryImpl(SQInfo* pQInfo) {
...
@@ -5140,15 +5163,15 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// record the total elapsed time
// record the total elapsed time
pQInfo
->
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
pQInfo
->
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
assert
(
taosArrayGetSize
(
pQInfo
->
pTableList
)
==
1
);
assert
(
pQInfo
->
groupInfo
.
numOfTables
==
1
);
/* check if query is killed or not */
/* check if query is killed or not */
if
(
isQueryKilled
(
pQInfo
))
{
if
(
isQueryKilled
(
pQInfo
))
{
dTrace
(
"QInfo:%p query is killed"
,
pQInfo
);
dTrace
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
{
}
else
{
STableId
*
pTableId
=
taosArrayGet
(
pQInfo
->
pTableList
,
0
);
// STableId* pTableId = taosArrayGet(pQInfo->groupInfo
, 0);
dTrace
(
"QInfo:%p uid:%"
PRIu64
" tid:%d, query completed, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
//
dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo
,
pTableId
->
uid
,
pTableId
->
tid
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
//
pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
}
sem_post
(
&
pQInfo
->
dataReady
);
sem_post
(
&
pQInfo
->
dataReady
);
...
@@ -5175,8 +5198,7 @@ static void stableQueryImpl(SQInfo* pQInfo) {
...
@@ -5175,8 +5198,7 @@ static void stableQueryImpl(SQInfo* pQInfo) {
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
if
(
pQuery
->
rec
.
rows
==
0
)
{
if
(
pQuery
->
rec
.
rows
==
0
)
{
int32_t
numOfTables
=
taosArrayGetSize
(
pQInfo
->
pTableList
);
dTrace
(
"QInfo:%p over, %d tables queried, %d points are returned"
,
pQInfo
,
pQInfo
->
groupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
dTrace
(
"QInfo:%p over, %d tables queried, %d points are returned"
,
pQInfo
,
numOfTables
,
pQuery
->
rec
.
total
);
// vnodePrintQueryStatistics(pSupporter);
// vnodePrintQueryStatistics(pSupporter);
}
}
...
@@ -5710,10 +5732,10 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) {
...
@@ -5710,10 +5732,10 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) {
}
}
static
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SSqlFunctionExpr
*
pExprs
,
static
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SSqlFunctionExpr
*
pExprs
,
S
Array
*
pTableList
)
{
S
TableGroupInfo
*
groupInfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
if
(
pQInfo
==
NULL
)
{
goto
_clean_pQInfo_memory
;
return
NULL
;
}
}
SQuery
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuery
));
SQuery
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuery
));
...
@@ -5808,7 +5830,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
...
@@ -5808,7 +5830,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure
// to make sure third party won't overwrite this structure
pQInfo
->
signature
=
pQInfo
;
pQInfo
->
signature
=
pQInfo
;
pQInfo
->
pTableList
=
pTableList
;
pQInfo
->
groupInfo
=
*
groupInfo
;
pQuery
->
pos
=
-
1
;
pQuery
->
pos
=
-
1
;
...
@@ -5842,7 +5864,6 @@ _clean_memory:
...
@@ -5842,7 +5864,6 @@ _clean_memory:
tfree
(
pExprs
);
tfree
(
pExprs
);
tfree
(
pGroupbyExpr
);
tfree
(
pGroupbyExpr
);
_clean_pQInfo_memory:
tfree
(
pQInfo
);
tfree
(
pQInfo
);
return
NULL
;
return
NULL
;
...
@@ -5918,14 +5939,12 @@ static void freeQInfo(SQInfo *pQInfo) {
...
@@ -5918,14 +5939,12 @@ static void freeQInfo(SQInfo *pQInfo) {
sem_destroy
(
&
(
pQInfo
->
dataReady
));
sem_destroy
(
&
(
pQInfo
->
dataReady
));
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
if
(
pQInfo
->
pTableDataInfo
!=
NULL
)
{
// if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableList);
// size_t num = taosHashGetSize(pQInfo->groupInfo);
for
(
int32_t
j
=
0
;
j
<
0
;
++
j
)
{
// for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo
(
pQInfo
->
pTableDataInfo
[
j
].
pTableQInfo
,
pQuery
->
numOfOutputCols
);
// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
// }
}
// }
tfree
(
pQInfo
->
pTableDataInfo
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfFilterCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfFilterCols
;
++
i
)
{
SSingleColumnFilterInfo
*
pColFilter
=
&
pQuery
->
pFilterInfo
[
i
];
SSingleColumnFilterInfo
*
pColFilter
=
&
pQuery
->
pFilterInfo
[
i
];
...
@@ -5958,7 +5977,7 @@ static void freeQInfo(SQInfo *pQInfo) {
...
@@ -5958,7 +5977,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree
(
pQuery
->
pGroupbyExpr
);
tfree
(
pQuery
->
pGroupbyExpr
);
tfree
(
pQuery
);
tfree
(
pQuery
);
taosArrayDestroy
(
pQInfo
->
pTable
List
);
taosArrayDestroy
(
pQInfo
->
groupInfo
.
pGroup
List
);
dTrace
(
"QInfo:%p QInfo is freed"
,
pQInfo
);
dTrace
(
"QInfo:%p QInfo is freed"
,
pQInfo
);
...
@@ -6061,7 +6080,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
...
@@ -6061,7 +6080,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
}
}
bool
isSTableQuery
=
false
;
bool
isSTableQuery
=
false
;
S
Array
*
pGroupList
=
NULL
;
S
TableGroupInfo
*
groupInfo
=
calloc
(
1
,
sizeof
(
STableGroupInfo
))
;
if
((
pQueryMsg
->
queryType
&
TSDB_QUERY_TYPE_STABLE_QUERY
)
!=
0
)
{
if
((
pQueryMsg
->
queryType
&
TSDB_QUERY_TYPE_STABLE_QUERY
)
!=
0
)
{
isSTableQuery
=
true
;
isSTableQuery
=
true
;
...
@@ -6069,8 +6088,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
...
@@ -6069,8 +6088,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
id
->
uid
=
-
1
;
//todo fix me
id
->
uid
=
-
1
;
//todo fix me
/*int32_t ret =*/
tsdbQueryTags
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
&
pGroupList
,
pGroupColIndex
,
pQueryMsg
->
numOfGroupCols
);
/*int32_t ret =*/
tsdbQueryTags
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
groupInfo
,
pGroupColIndex
,
pQueryMsg
->
numOfGroupCols
);
if
(
taosArrayGetSize
(
pGroupList
)
==
0
)
{
// no qualified tables no need to do query
if
(
groupInfo
->
numOfTables
==
0
)
{
// no qualified tables no need to do query
code
=
TSDB_CODE_SUCCESS
;
code
=
TSDB_CODE_SUCCESS
;
goto
_query_over
;
goto
_query_over
;
}
}
...
@@ -6078,12 +6097,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
...
@@ -6078,12 +6097,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
assert
(
taosArrayGetSize
(
pTableIdList
)
==
1
);
assert
(
taosArrayGetSize
(
pTableIdList
)
==
1
);
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
&
pGroupList
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
groupInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_query_over
;
goto
_query_over
;
}
}
}
}
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
pGroupList
);
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
groupInfo
);
if
((
*
pQInfo
)
==
NULL
)
{
if
((
*
pQInfo
)
==
NULL
)
{
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
}
...
...
src/tsdb/inc/tsdb.h
浏览文件 @
0d2b5ca6
...
@@ -95,6 +95,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
...
@@ -95,6 +95,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int
tsdbTableSetTagValue
(
STableCfg
*
config
,
SDataRow
row
,
bool
dup
);
int
tsdbTableSetTagValue
(
STableCfg
*
config
,
SDataRow
row
,
bool
dup
);
void
tsdbClearTableCfg
(
STableCfg
*
config
);
void
tsdbClearTableCfg
(
STableCfg
*
config
);
int32_t
tsdbGetTableTagVal
(
tsdb_repo_t
*
repo
,
STableId
id
,
int32_t
col
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
);
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
);
int
tsdbDropTable
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbAlterTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
...
@@ -181,11 +183,6 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
...
@@ -181,11 +183,6 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
typedef
void
*
tsdb_query_handle_t
;
// Use void to hide implementation details
typedef
void
*
tsdb_query_handle_t
;
// Use void to hide implementation details
typedef
struct
STableGroupList
{
// qualified table object list in group
SArray
*
pGroupList
;
int32_t
numOfTables
;
}
STableGroupList
;
// query condition to build vnode iterator
// query condition to build vnode iterator
typedef
struct
STsdbQueryCond
{
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
STimeWindow
twindow
;
...
@@ -216,6 +213,11 @@ typedef struct SDataBlockInfo {
...
@@ -216,6 +213,11 @@ typedef struct SDataBlockInfo {
int32_t
sid
;
int32_t
sid
;
}
SDataBlockInfo
;
}
SDataBlockInfo
;
typedef
struct
{
size_t
numOfTables
;
SArray
*
pGroupList
;
}
STableGroupInfo
;
typedef
struct
{
typedef
struct
{
}
SFields
;
}
SFields
;
...
@@ -235,7 +237,7 @@ typedef void *tsdbpos_t;
...
@@ -235,7 +237,7 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list
* @param pTableList table sid list
* @return
* @return
*/
*/
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
S
Array
*
idList
,
SArray
*
pColumnInfo
);
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
S
TableGroupInfo
*
groupInfo
,
SArray
*
pColumnInfo
);
/**
/**
* move to next block
* move to next block
...
@@ -337,10 +339,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
...
@@ -337,10 +339,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition
* @param pTagCond. tag query condition
*
*
*/
*/
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
S
Array
*
*
pGroupList
,
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
S
TableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
S
Array
**
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
S
TableGroupInfo
*
pGroupInfo
);
/**
/**
* clean up the query handle
* clean up the query handle
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
0d2b5ca6
...
@@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
...
@@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
}
}
}
}
int32_t
tsdbGetTableTagVal
(
tsdb_repo_t
*
repo
,
STableId
id
,
int32_t
col
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
)
{
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
.
uid
);
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
pMeta
,
pTable
);
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
col
);
SDataRow
row
=
(
SDataRow
)
pTable
->
tagVal
;
char
*
d
=
dataRowAt
(
row
,
TD_DATA_ROW_HEAD_SIZE
);
*
val
=
d
;
*
type
=
pCol
->
type
;
*
bytes
=
pCol
->
bytes
;
return
0
;
}
int32_t
tsdbCreateTableImpl
(
STsdbMeta
*
pMeta
,
STableCfg
*
pCfg
)
{
int32_t
tsdbCreateTableImpl
(
STsdbMeta
*
pMeta
,
STableCfg
*
pCfg
)
{
if
(
tsdbCheckTableCfg
(
pCfg
)
<
0
)
return
-
1
;
if
(
tsdbCheckTableCfg
(
pCfg
)
<
0
)
return
-
1
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
0d2b5ca6
...
@@ -143,7 +143,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
...
@@ -143,7 +143,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo
->
fileListIndex
=
-
1
;
pCompBlockLoadInfo
->
fileListIndex
=
-
1
;
}
}
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
S
Array
*
groupList
,
SArray
*
pColumnInfo
)
{
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
S
TableGroupInfo
*
groupList
,
SArray
*
pColumnInfo
)
{
// todo 1. filter not exist table
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
// todo 2. add the reference count for each table that is involved in query
...
@@ -157,22 +157,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
...
@@ -157,22 +157,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
pQueryHandle
->
isFirstSlot
=
true
;
pQueryHandle
->
isFirstSlot
=
true
;
pQueryHandle
->
cur
.
fid
=
-
1
;
pQueryHandle
->
cur
.
fid
=
-
1
;
size_t
size
=
taosArrayGetSize
(
g
roupList
);
size_t
size
OfGroup
=
taosArrayGetSize
(
groupList
->
pG
roupList
);
assert
(
size
>=
1
);
assert
(
size
OfGroup
>=
1
);
pQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
size
,
sizeof
(
STableCheckInfo
));
pQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
groupList
->
numOfTables
,
sizeof
(
STableCheckInfo
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
groupList
,
i
);
for
(
int32_t
i
=
0
;
i
<
sizeOfGroup
;
++
i
)
{
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
groupList
->
pGroupList
,
i
);
size_t
gsize
=
taosArrayGetSize
(
group
);
size_t
gsize
=
taosArrayGetSize
(
group
);
assert
(
gsize
>
0
);
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
S
Table
*
pTable
=
*
(
STable
**
)
taosArrayGet
(
group
,
j
);
S
Pair
*
d
=
(
SPair
*
)
taosArrayGet
(
group
,
j
);
assert
(
pTable
!=
NULL
);
assert
(
d
->
first
!=
NULL
);
STableCheckInfo
info
=
{
STableCheckInfo
info
=
{
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
pTable
->
tableId
,
.
tableId
=
((
STable
*
)
d
->
first
)
->
tableId
,
.
pTableObj
=
pTable
,
.
pTableObj
=
d
->
first
,
};
};
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
...
@@ -1143,7 +1146,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
...
@@ -1143,7 +1146,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
SSkipListNode
*
pNode
=
tSkipListIterGet
(
iter
);
SSkipListNode
*
pNode
=
tSkipListIterGet
(
iter
);
STable
*
t
=
*
(
STable
**
)
SL_GET_NODE_DATA
(
pNode
);
STable
*
t
=
*
(
STable
**
)
SL_GET_NODE_DATA
(
pNode
);
taosArrayPush
(
list
,
t
);
taosArrayPush
(
list
,
&
t
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1306,26 +1309,31 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
...
@@ -1306,26 +1309,31 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
}
}
void
createTableGroupImpl
(
SArray
*
pGroups
,
STable
**
pTables
,
size_t
numOfTables
,
STableGroupSupporter
*
pSupp
,
__ext_compar_fn_t
compareFn
)
{
void
createTableGroupImpl
(
SArray
*
pGroups
,
STable
**
pTables
,
size_t
numOfTables
,
STableGroupSupporter
*
pSupp
,
__ext_compar_fn_t
compareFn
)
{
SArray
*
g
=
taosArrayInit
(
16
,
POINTER_BYTES
);
SArray
*
g
=
taosArrayInit
(
16
,
sizeof
(
SPair
));
taosArrayPush
(
g
,
&
pTables
[
0
]);
SPair
p
=
{.
first
=
pTables
[
0
]};
taosArrayPush
(
g
,
&
p
);
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
int32_t
ret
=
compareFn
(
&
pTables
[
i
-
1
],
&
pTables
[
i
],
pSupp
);
int32_t
ret
=
compareFn
(
&
pTables
[
i
-
1
],
&
pTables
[
i
],
pSupp
);
assert
(
ret
==
0
||
ret
==
-
1
);
assert
(
ret
==
0
||
ret
==
-
1
);
if
(
ret
==
0
)
{
if
(
ret
==
0
)
{
taosArrayPush
(
g
,
&
pTables
[
i
]);
SPair
p1
=
{.
first
=
pTables
[
i
]};
taosArrayPush
(
g
,
&
p1
);
}
else
{
}
else
{
taosArrayPush
(
pGroups
,
&
g
);
// current group is ended, start a new group
taosArrayPush
(
pGroups
,
&
g
);
// current group is ended, start a new group
g
=
taosArrayInit
(
16
,
POINTER_BYTES
);
g
=
taosArrayInit
(
16
,
POINTER_BYTES
);
taosArrayPush
(
g
,
&
pTables
[
i
]);
SPair
p1
=
{.
first
=
pTables
[
i
]};
taosArrayPush
(
g
,
&
p1
);
}
}
}
}
}
}
SArray
*
createTableGroup
(
SArray
*
pTableList
,
STSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
)
{
SArray
*
createTableGroup
(
SArray
*
pTableList
,
STSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
)
{
assert
(
pTableList
!=
NULL
&&
taosArrayGetSize
(
pTableList
)
>
0
);
assert
(
pTableList
!=
NULL
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
size_t
size
=
taosArrayGetSize
(
pTableList
);
size_t
size
=
taosArrayGetSize
(
pTableList
);
...
@@ -1335,7 +1343,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
...
@@ -1335,7 +1343,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
}
if
(
numOfOrderCols
==
0
||
size
==
1
)
{
// no group by tags clause or only one table
if
(
numOfOrderCols
==
0
||
size
==
1
)
{
// no group by tags clause or only one table
taosArrayPush
(
pTableGroup
,
pTableList
);
size_t
num
=
taosArrayGetSize
(
pTableList
);
SArray
*
sa
=
taosArrayInit
(
num
,
sizeof
(
SPair
));
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STable
*
pTable
=
taosArrayGetP
(
pTableList
,
i
);
SPair
p
=
{.
first
=
pTable
};
taosArrayPush
(
sa
,
&
p
);
}
taosArrayPush
(
pTableGroup
,
&
sa
);
pTrace
(
"all %d tables belong to one group"
,
size
);
pTrace
(
"all %d tables belong to one group"
,
size
);
#ifdef _DEBUG_VIEW
#ifdef _DEBUG_VIEW
...
@@ -1430,7 +1448,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
...
@@ -1430,7 +1448,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
S
Array
**
pGroupList
,
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
S
TableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
STable
*
pSTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
STable
*
pSTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
...
@@ -1449,8 +1467,8 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
...
@@ -1449,8 +1467,8 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
return
ret
;
return
ret
;
}
}
*
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCol
s
);
pGroupInfo
->
numOfTables
=
taosArrayGetSize
(
re
s
);
taosArrayDestroy
(
re
s
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCol
s
);
return
ret
;
return
ret
;
}
}
...
@@ -1465,25 +1483,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
...
@@ -1465,25 +1483,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
}
}
doQueryTableList
(
pSTable
,
res
,
pExprNode
);
doQueryTableList
(
pSTable
,
res
,
pExprNode
);
*
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
);
taosArrayDestroy
(
res
);
pGroupInfo
->
numOfTables
=
taosArrayGetSize
(
res
);
pGroupInfo
->
pGroupList
=
createTableGroup
(
res
,
pTagSchema
,
pColIndex
,
numOfCols
);
return
ret
;
return
ret
;
}
}
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
S
Array
**
pGroupList
)
{
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
S
TableGroupInfo
*
pGroupInfo
)
{
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
if
(
pTable
==
NULL
)
{
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_INVALID_TABLE_ID
;
return
TSDB_CODE_INVALID_TABLE_ID
;
}
}
//todo assert table type, add the table ref count
//todo assert table type, add the table ref count
pGroupInfo
->
numOfTables
=
1
;
pGroupInfo
->
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
*
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
group
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
group
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
group
,
&
pTable
);
taosArrayPush
(
group
,
&
pTable
);
taosArrayPush
(
*
pGroupList
,
&
group
);
taosArrayPush
(
pGroupInfo
->
pGroupList
,
&
group
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
src/util/inc/tutil.h
浏览文件 @
0d2b5ca6
...
@@ -107,6 +107,11 @@ extern "C" {
...
@@ -107,6 +107,11 @@ extern "C" {
#define POW2(x) ((x) * (x))
#define POW2(x) ((x) * (x))
typedef
struct
SPair
{
void
*
first
;
void
*
sec
;
}
SPair
;
int32_t
strdequote
(
char
*
src
);
int32_t
strdequote
(
char
*
src
);
void
strtrim
(
char
*
src
);
void
strtrim
(
char
*
src
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录