Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a0fe1d3c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a0fe1d3c
编写于
10月 17, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
10月 17, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3893 from taosdata/feature/query
Feature/query
上级
5acc9c28
d4ad59df
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
249 addition
and
216 deletion
+249
-216
src/client/inc/tscSubquery.h
src/client/inc/tscSubquery.h
+1
-1
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+0
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+35
-20
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+45
-23
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+25
-28
src/query/inc/qExtbuffer.h
src/query/inc/qExtbuffer.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+3
-1
src/query/src/qExtbuffer.c
src/query/src/qExtbuffer.c
+10
-8
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+129
-133
未找到文件。
src/client/inc/tscSubquery.h
浏览文件 @
a0fe1d3c
...
...
@@ -28,7 +28,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void
tscSetupOutputColumnIndex
(
SSqlObj
*
pSql
);
void
tscJoinQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
);
SJoinSupporter
*
tscCreateJoinSupporter
(
SSqlObj
*
pSql
,
SSubqueryState
*
pState
,
int32_t
index
);
SJoinSupporter
*
tscCreateJoinSupporter
(
SSqlObj
*
pSql
,
int32_t
index
);
void
tscHandleMasterJoinQuery
(
SSqlObj
*
pSql
);
...
...
src/client/inc/tscUtil.h
浏览文件 @
a0fe1d3c
...
...
@@ -109,7 +109,6 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
void
*
tscDestroyBlockArrayList
(
SArray
*
pDataBlockList
);
int32_t
tscCopyDataBlockToPayload
(
SSqlObj
*
pSql
,
STableDataBlocks
*
pDataBlock
);
void
tscFreeUnusedDataBlocks
(
SArray
*
pDataBlockList
);
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SArray
*
pDataList
);
int32_t
tscGetDataBlockFromList
(
void
*
pHashList
,
SArray
*
pDataBlockList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
char
*
tableId
,
STableMeta
*
pTableMeta
,
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
a0fe1d3c
...
...
@@ -554,27 +554,48 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
numOfGroupByCols
++
;
}
int32_t
*
order
Idx
=
(
int32_t
*
)
calloc
(
numOfGroupByCols
,
sizeof
(
int32_t
));
if
(
order
Idx
==
NULL
)
{
int32_t
*
order
ColIndexList
=
(
int32_t
*
)
calloc
(
numOfGroupByCols
,
sizeof
(
int32_t
));
if
(
order
ColIndexList
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
if
(
numOfGroupByCols
>
0
)
{
int32_t
startCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
// tags value locate at the last columns
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
++
i
)
{
orderIdx
[
i
]
=
startCols
++
;
}
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
>
0
)
{
int32_t
startCols
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
++
i
)
{
orderColIndexList
[
i
]
=
startCols
++
;
}
if
(
pQueryInfo
->
interval
.
interval
!=
0
)
{
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderColIndexList
[
numOfGroupByCols
-
1
]
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
//TODO ???
}
}
else
{
/*
* 1. the orderby ts asc/desc projection query for the super table
* 2. interval query without groupby clause
*/
if
(
pQueryInfo
->
interval
.
interval
!=
0
)
{
orderColIndexList
[
0
]
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
}
else
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_PRJ
&&
pExpr
->
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
orderColIndexList
[
0
]
=
i
;
}
}
}
if
(
pQueryInfo
->
interval
.
interval
!=
0
)
{
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderIdx
[
numOfGroupByCols
-
1
]
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
assert
(
pQueryInfo
->
order
.
orderColId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
}
}
*
pOrderDesc
=
tOrderDesCreate
(
order
Idx
,
numOfGroupByCols
,
pModel
,
pQueryInfo
->
order
.
order
);
taosTFree
(
order
Idx
);
*
pOrderDesc
=
tOrderDesCreate
(
order
ColIndexList
,
numOfGroupByCols
,
pModel
,
pQueryInfo
->
order
.
order
);
taosTFree
(
order
ColIndexList
);
if
(
*
pOrderDesc
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -588,7 +609,6 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
// disable merge procedure for column projection query
int16_t
functionId
=
pReducer
->
pCtx
[
0
].
functionId
;
assert
(
functionId
!=
TSDB_FUNC_ARITHM
);
if
(
pReducer
->
orderPrjOnSTable
)
{
return
true
;
}
...
...
@@ -606,7 +626,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
return
true
;
}
if
(
orderInfo
->
pData
[
numOfCols
-
1
]
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
if
(
orderInfo
->
colIndex
[
numOfCols
-
1
]
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
/*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
...
...
@@ -620,7 +640,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
// only one row exists
int32_t
index
=
orderInfo
->
pData
[
0
];
int32_t
index
=
orderInfo
->
colIndex
[
0
];
int32_t
offset
=
(
pOrderDesc
->
pColumnModel
)
->
pFields
[
index
].
offset
;
int32_t
ret
=
memcmp
(
pPrev
+
offset
,
tmpBuffer
->
data
+
offset
,
pOrderDesc
->
pColumnModel
->
rowSize
-
offset
);
...
...
@@ -661,7 +681,6 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema
[
i
].
bytes
=
pExpr
->
resBytes
;
pSchema
[
i
].
type
=
(
int8_t
)
pExpr
->
resType
;
rlen
+=
pExpr
->
resBytes
;
}
...
...
@@ -701,12 +720,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
int16_t
type
=
-
1
;
int16_t
bytes
=
0
;
// if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
// pExpr->functionId == TSDB_FUNC_LAST_ROW) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
>=
TSDB_FUNC_TS
&&
functionId
<=
TSDB_FUNC_DIFF
)
{
type
=
pModel
->
pFields
[
i
].
field
.
type
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
a0fe1d3c
...
...
@@ -626,6 +626,11 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
return
TSDB_CODE_SUCCESS
;
}
// orderby column not set yet, set it to be the primary timestamp column
if
(
pQueryInfo
->
order
.
orderColId
==
INT32_MIN
)
{
pQueryInfo
->
order
.
orderColId
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
}
// interval is not null
SStrToken
*
t
=
&
pQuerySql
->
interval
;
if
(
parseNatualDuration
(
t
->
z
,
t
->
n
,
&
pQueryInfo
->
interval
.
interval
,
&
pQueryInfo
->
interval
.
intervalUnit
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1347,6 +1352,32 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
insertResultField
(
pQueryInfo
,
startPos
,
&
ids
,
pExpr
->
resBytes
,
(
int8_t
)
pExpr
->
resType
,
pExpr
->
aliasName
,
pExpr
);
}
static
void
addPrimaryTsColIntoResult
(
SQueryInfo
*
pQueryInfo
)
{
// primary timestamp column has been added already
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_PRJ
&&
pExpr
->
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
;
}
}
SColumnIndex
index
=
{
0
};
// set the constant column value always attached to first table.
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
// add the timestamp column into the output columns
int32_t
numOfCols
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
);
tscAddSpecialColumnForSelect
(
pQueryInfo
,
numOfCols
,
TSDB_FUNC_PRJ
,
&
index
,
pSchema
,
TSDB_COL_NORMAL
);
SFieldSupInfo
*
pSupInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
numOfCols
);
pSupInfo
->
visible
=
false
;
pQueryInfo
->
type
|=
TSDB_QUERY_TYPE_PROJECTION_QUERY
;
}
int32_t
parseSelectClause
(
SSqlCmd
*
pCmd
,
int32_t
clauseIndex
,
tSQLExprList
*
pSelection
,
bool
isSTable
,
bool
joinQuery
)
{
assert
(
pSelection
!=
NULL
&&
pCmd
!=
NULL
);
...
...
@@ -1400,20 +1431,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
// there is only one user-defined column in the final result field, add the timestamp column.
size_t
numOfSrcCols
=
taosArrayGetSize
(
pQueryInfo
->
colList
);
if
(
numOfSrcCols
<=
0
&&
!
tscQueryTags
(
pQueryInfo
))
{
SColumnIndex
index
=
{
0
};
// set the constant column value always attached to first table.
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
clauseIndex
,
0
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
// add the timestamp column into the output columns
int32_t
numOfCols
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
);
tscAddSpecialColumnForSelect
(
pQueryInfo
,
numOfCols
,
TSDB_FUNC_PRJ
,
&
index
,
pSchema
,
TSDB_COL_NORMAL
);
SFieldSupInfo
*
pSupInfo
=
tscFieldInfoGetSupp
(
&
pQueryInfo
->
fieldsInfo
,
numOfCols
);
pSupInfo
->
visible
=
false
;
pQueryInfo
->
type
|=
TSDB_QUERY_TYPE_PROJECTION_QUERY
;
addPrimaryTsColIntoResult
(
pQueryInfo
);
}
if
(
!
functionCompatibleCheck
(
pQueryInfo
,
joinQuery
))
{
...
...
@@ -4371,14 +4389,13 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
static
void
setDefaultOrderInfo
(
SQueryInfo
*
pQueryInfo
)
{
/* set default timestamp order information for all queries */
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
if
(
isTopBottomQuery
(
pQueryInfo
))
{
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
pQueryInfo
->
order
.
orderColId
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
}
else
{
pQueryInfo
->
order
.
orderColId
=
-
1
;
}
else
{
// in case of select tbname from super_table, the defualt order column can not be the primary ts column
pQueryInfo
->
order
.
orderColId
=
INT32_MIN
;
}
/* for super table query, set default ascending order for group output */
...
...
@@ -4482,6 +4499,11 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
}
else
{
pQueryInfo
->
order
.
order
=
pSortorder
->
a
[
0
].
sortOrder
;
pQueryInfo
->
order
.
orderColId
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
// orderby ts query on super table
if
(
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
addPrimaryTsColIntoResult
(
pQueryInfo
);
}
}
}
...
...
@@ -6277,6 +6299,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// set order by info
if
(
parseOrderbyClause
(
pCmd
,
pQueryInfo
,
pQuerySql
,
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
))
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// set interval value
if
(
parseIntervalClause
(
pCmd
,
pQueryInfo
,
pQuerySql
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
...
...
@@ -6287,11 +6314,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
}
// set order by info
if
(
parseOrderbyClause
(
pCmd
,
pQueryInfo
,
pQuerySql
,
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
))
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// user does not specified the query time window, twa is not allowed in such case.
if
((
pQueryInfo
->
window
.
skey
==
INT64_MIN
||
pQueryInfo
->
window
.
ekey
==
INT64_MAX
||
(
pQueryInfo
->
window
.
ekey
==
INT64_MAX
/
1000
&&
tinfo
.
precision
==
TSDB_TIME_PRECISION_MILLI
))
&&
tscIsTWAQuery
(
pQueryInfo
))
{
...
...
src/client/src/tscSubquery.c
浏览文件 @
a0fe1d3c
...
...
@@ -166,7 +166,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
}
// todo handle failed to create sub query
SJoinSupporter
*
tscCreateJoinSupporter
(
SSqlObj
*
pSql
,
SSubqueryState
*
pState
,
int32_t
index
)
{
SJoinSupporter
*
tscCreateJoinSupporter
(
SSqlObj
*
pSql
,
int32_t
index
)
{
SJoinSupporter
*
pSupporter
=
calloc
(
1
,
sizeof
(
SJoinSupporter
));
if
(
pSupporter
==
NULL
)
{
return
NULL
;
...
...
@@ -1300,11 +1300,11 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t
code
=
TSDB_CODE_SUCCESS
;
// todo add test
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
if
(
pState
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
//
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
//
if (pState == NULL) {
//
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
//
goto _error;
//
}
pSql
->
subState
.
numOfSub
=
pQueryInfo
->
numOfTables
;
...
...
@@ -1312,7 +1312,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscDebug
(
"%p start subquery, total:%d"
,
pSql
,
pQueryInfo
->
numOfTables
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
SJoinSupporter
*
pSupporter
=
tscCreateJoinSupporter
(
pSql
,
pState
,
i
);
SJoinSupporter
*
pSupporter
=
tscCreateJoinSupporter
(
pSql
,
i
);
if
(
pSupporter
==
NULL
)
{
// failed to create support struct, abort current query
tscError
(
"%p tableIndex:%d, failed to allocate join support object, abort further query"
,
pSql
,
i
);
...
...
@@ -1357,8 +1357,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscQueueAsyncRes
(
pSql
);
}
static
void
doCleanupSubqueries
(
SSqlObj
*
pSql
,
int32_t
numOfSubs
,
SSubqueryState
*
pState
)
{
assert
(
numOfSubs
<=
pSql
->
subState
.
numOfSub
&&
numOfSubs
>=
0
&&
pState
!=
NULL
);
static
void
doCleanupSubqueries
(
SSqlObj
*
pSql
,
int32_t
numOfSubs
)
{
assert
(
numOfSubs
<=
pSql
->
subState
.
numOfSub
&&
numOfSubs
>=
0
);
for
(
int32_t
i
=
0
;
i
<
numOfSubs
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
...
...
@@ -1371,8 +1371,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
taos_free_result
(
pSub
);
}
free
(
pState
);
}
int32_t
tscHandleMasterSTableQuery
(
SSqlObj
*
pSql
)
{
...
...
@@ -1395,9 +1393,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
pSql
->
subState
.
numOfSub
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
assert
(
pSql
->
subState
.
numOfSub
>
0
);
SSubqueryState
*
pState
=
&
pSql
->
subState
;
pState
->
numOfSub
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
assert
(
pState
->
numOfSub
>
0
);
int32_t
ret
=
tscLocalReducerEnvCreate
(
pSql
,
&
pMemoryBuf
,
&
pDesc
,
&
pModel
,
nBufferSize
);
if
(
ret
!=
0
)
{
...
...
@@ -1407,26 +1406,24 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return
ret
;
}
pSql
->
pSubs
=
calloc
(
pS
ql
->
subState
.
numOfSub
,
POINTER_BYTES
);
pSql
->
pSubs
=
calloc
(
pS
tate
->
numOfSub
,
POINTER_BYTES
);
tscDebug
(
"%p retrieved query data from %d vnode(s)"
,
pSql
,
pSql
->
subState
.
numOfSub
);
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
tscDebug
(
"%p retrieved query data from %d vnode(s)"
,
pSql
,
pState
->
numOfSub
);
if
(
pSql
->
pSubs
==
NULL
||
pState
==
NULL
)
{
taosTFree
(
pState
);
if
(
pSql
->
pSubs
==
NULL
)
{
taosTFree
(
pSql
->
pSubs
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
ql
->
subState
.
numOfSub
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
tate
->
numOfSub
);
tscQueueAsyncRes
(
pSql
);
return
ret
;
}
pS
ql
->
subState
.
numOfRemain
=
pSql
->
subState
.
numOfSub
;
pS
tate
->
numOfRemain
=
pState
->
numOfSub
;
pRes
->
code
=
TSDB_CODE_SUCCESS
;
int32_t
i
=
0
;
for
(;
i
<
pS
ql
->
subState
.
numOfSub
;
++
i
)
{
for
(;
i
<
pS
tate
->
numOfSub
;
++
i
)
{
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
calloc
(
1
,
sizeof
(
SRetrieveSupport
));
if
(
trs
==
NULL
)
{
tscError
(
"%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
...
...
@@ -1465,22 +1462,22 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug
(
"%p sub:%p create subquery success. orderOfSub:%d"
,
pSql
,
pNew
,
trs
->
subqueryIndex
);
}
if
(
i
<
pS
ql
->
subState
.
numOfSub
)
{
if
(
i
<
pS
tate
->
numOfSub
)
{
tscError
(
"%p failed to prepare subquery structure and launch subqueries"
,
pSql
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
ql
->
subState
.
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
,
pState
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
tate
->
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
);
return
pRes
->
code
;
// free all allocated resource
}
if
(
pRes
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
)
{
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
ql
->
subState
.
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
,
pState
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pS
tate
->
numOfSub
);
doCleanupSubqueries
(
pSql
,
i
);
return
pRes
->
code
;
}
for
(
int32_t
j
=
0
;
j
<
pS
ql
->
subState
.
numOfSub
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pS
tate
->
numOfSub
;
++
j
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
j
];
SRetrieveSupport
*
pSupport
=
pSub
->
param
;
...
...
src/query/inc/qExtbuffer.h
浏览文件 @
a0fe1d3c
...
...
@@ -89,7 +89,7 @@ typedef struct SColumnModel {
typedef
struct
SColumnOrderInfo
{
int32_t
numOfCols
;
int16_t
pData
[];
int16_t
colIndex
[];
}
SColumnOrderInfo
;
typedef
struct
tOrderDescriptor
{
...
...
src/query/src/qExecutor.c
浏览文件 @
a0fe1d3c
...
...
@@ -1537,7 +1537,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
if
(
isNull
((
const
char
*
)
&
pQuery
->
fillVal
[
colIndex
],
pCtx
->
inputType
))
{
pCtx
->
param
[
1
].
nType
=
TSDB_DATA_TYPE_NULL
;
}
else
{
// todo refactor, tVariantCreateFromBinary should handle the NULL value
tVariantCreateFromBinary
(
&
pCtx
->
param
[
1
],
(
char
*
)
&
pQuery
->
fillVal
[
colIndex
],
pCtx
->
inputBytes
,
pCtx
->
inputType
);
if
(
pCtx
->
inputType
!=
TSDB_DATA_TYPE_BINARY
&&
pCtx
->
inputType
!=
TSDB_DATA_TYPE_NCHAR
)
{
tVariantCreateFromBinary
(
&
pCtx
->
param
[
1
],
(
char
*
)
&
pQuery
->
fillVal
[
colIndex
],
pCtx
->
inputBytes
,
pCtx
->
inputType
);
}
}
}
}
...
...
src/query/src/qExtbuffer.c
浏览文件 @
a0fe1d3c
...
...
@@ -343,8 +343,10 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
if
(
f1
==
f2
)
{
return
0
;
}
if
(
colIdx
==
0
&&
tsOrder
==
TSDB_ORDER_DESC
)
{
// primary column desc order
assert
(
colIdx
==
0
);
if
(
tsOrder
==
TSDB_ORDER_DESC
)
{
// primary column desc order
return
(
f1
<
f2
)
?
1
:
-
1
;
}
else
{
// asc
return
(
f1
<
f2
)
?
-
1
:
1
;
...
...
@@ -435,7 +437,7 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t
cmpCnt
=
pDescriptor
->
orderInfo
.
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
cmpCnt
;
++
i
)
{
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
pData
[
i
];
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
colIndex
[
i
];
char
*
f1
=
COLMODEL_GET_VAL
(
data1
,
pDescriptor
->
pColumnModel
,
numOfRows1
,
s1
,
colIdx
);
char
*
f2
=
COLMODEL_GET_VAL
(
data2
,
pDescriptor
->
pColumnModel
,
numOfRows2
,
s2
,
colIdx
);
...
...
@@ -467,7 +469,7 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t
cmpCnt
=
pDescriptor
->
orderInfo
.
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
cmpCnt
;
++
i
)
{
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
pData
[
i
];
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
colIndex
[
i
];
char
*
f1
=
COLMODEL_GET_VAL
(
data1
,
pDescriptor
->
pColumnModel
,
numOfRows1
,
s1
,
colIdx
);
char
*
f2
=
COLMODEL_GET_VAL
(
data2
,
pDescriptor
->
pColumnModel
,
numOfRows2
,
s2
,
colIdx
);
...
...
@@ -557,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t
midIdx
=
((
end
-
start
)
>>
1
)
+
start
;
#if defined(_DEBUG_VIEW)
int32_t
f
=
pDescriptor
->
orderInfo
.
pData
[
0
];
int32_t
f
=
pDescriptor
->
orderInfo
.
colIndex
[
0
];
char
*
midx
=
COLMODEL_GET_VAL
(
data
,
pDescriptor
->
pColumnModel
,
numOfRows
,
midIdx
,
f
);
char
*
startx
=
COLMODEL_GET_VAL
(
data
,
pDescriptor
->
pColumnModel
,
numOfRows
,
start
,
f
);
char
*
endx
=
COLMODEL_GET_VAL
(
data
,
pDescriptor
->
pColumnModel
,
numOfRows
,
end
,
f
);
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
pData
[
0
];
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
colIndex
[
0
];
tSortDataPrint
(
pDescriptor
->
pColumnModel
->
pFields
[
colIdx
].
field
.
type
,
"before"
,
startx
,
midx
,
endx
);
#endif
...
...
@@ -591,7 +593,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
static
UNUSED_FUNC
void
tRowModelDisplay
(
tOrderDescriptor
*
pDescriptor
,
int32_t
numOfRows
,
char
*
d
,
int32_t
len
)
{
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
pData
[
0
];
int32_t
colIdx
=
pDescriptor
->
orderInfo
.
colIndex
[
0
];
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
char
*
startx
=
COLMODEL_GET_VAL
(
d
,
pDescriptor
->
pColumnModel
,
numOfRows
,
i
,
colIdx
);
...
...
@@ -1075,7 +1077,7 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc
->
orderInfo
.
numOfCols
=
numOfOrderCols
;
for
(
int32_t
i
=
0
;
i
<
numOfOrderCols
;
++
i
)
{
desc
->
orderInfo
.
pData
[
i
]
=
orderColIdx
[
i
];
desc
->
orderInfo
.
colIndex
[
i
]
=
orderColIdx
[
i
];
}
return
desc
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
a0fe1d3c
...
...
@@ -72,7 +72,6 @@ typedef struct STableCheckInfo {
SCompInfo
*
pCompInfo
;
int32_t
compSize
;
int32_t
numOfBlocks
;
// number of qualified data blocks not the original blocks
SDataCols
*
pDataCols
;
int32_t
chosen
;
// indicate which iterator should move forward
bool
initBuf
;
// whether to initialize the in-memory skip list iterator or not
SSkipListIterator
*
iter
;
// mem buffer skip list iterator
...
...
@@ -117,10 +116,12 @@ typedef struct STsdbQueryHandle {
SFileGroupIter
fileIter
;
SRWHelper
rhelper
;
STableBlockInfo
*
pDataBlockInfo
;
SDataCols
*
pDataCols
;
// in order to hold current file data block
int32_t
allocSize
;
// allocated data block size
SMemTable
*
mem
;
// mem-table
SMemTable
*
imem
;
// imem-table, acquired from snapshot
SArray
*
defaultLoadColumn
;
// default load column
SMemTable
*
mem
;
// mem-table
SMemTable
*
imem
;
// imem-table, acquired from snapshot
SArray
*
defaultLoadColumn
;
// default load column
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQuery */
...
...
@@ -181,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
return
pLocalIdList
;
}
TsdbQueryHandleT
*
tsdbQueryTables
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbQueryHandle
*
pQueryHandle
,
STableGroupInfo
*
pGroupList
,
STsdbMeta
*
pMeta
)
{
size_t
sizeOfGroup
=
taosArrayGetSize
(
pGroupList
->
pGroupList
);
assert
(
sizeOfGroup
>=
1
&&
pMeta
!=
NULL
);
// allocate buffer in order to load data blocks from file
SArray
*
pTableCheckInfo
=
taosArrayInit
(
pGroupList
->
numOfTables
,
sizeof
(
STableCheckInfo
));
if
(
pTableCheckInfo
==
NULL
)
{
return
NULL
;
}
// todo apply the lastkey of table check to avoid to load header file
for
(
int32_t
i
=
0
;
i
<
sizeOfGroup
;
++
i
)
{
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
pGroupList
->
pGroupList
,
i
);
size_t
gsize
=
taosArrayGetSize
(
group
);
assert
(
gsize
>
0
);
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
pTableObj
=
pKeyInfo
->
pTable
};
info
.
tableId
=
((
STable
*
)(
pKeyInfo
->
pTable
))
->
tableId
;
assert
(
info
.
pTableObj
!=
NULL
&&
(
info
.
pTableObj
->
type
==
TSDB_NORMAL_TABLE
||
info
.
pTableObj
->
type
==
TSDB_CHILD_TABLE
||
info
.
pTableObj
->
type
==
TSDB_STREAM_TABLE
));
info
.
tableId
.
tid
=
info
.
pTableObj
->
tableId
.
tid
;
info
.
tableId
.
uid
=
info
.
pTableObj
->
tableId
.
uid
;
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
assert
(
info
.
lastKey
>=
pQueryHandle
->
window
.
skey
);
}
else
{
assert
(
info
.
lastKey
<=
pQueryHandle
->
window
.
skey
);
}
taosArrayPush
(
pTableCheckInfo
,
&
info
);
tsdbDebug
(
"%p check table uid:%"
PRId64
", tid:%d from lastKey:%"
PRId64
" %p"
,
pQueryHandle
,
info
.
tableId
.
uid
,
info
.
tableId
.
tid
,
info
.
lastKey
,
pQueryHandle
->
qinfo
);
}
}
taosArraySort
(
pTableCheckInfo
,
tsdbCheckInfoCompar
);
return
pTableCheckInfo
;
}
static
SArray
*
createCheckInfoFromCheckInfo
(
SArray
*
pTableCheckInfo
,
TSKEY
skey
)
{
size_t
si
=
taosArrayGetSize
(
pTableCheckInfo
);
SArray
*
pNew
=
taosArrayInit
(
si
,
sizeof
(
STableCheckInfo
));
if
(
pNew
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
j
=
0
;
j
<
si
;
++
j
)
{
STableCheckInfo
*
pCheckInfo
=
(
STableCheckInfo
*
)
taosArrayGet
(
pTableCheckInfo
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
skey
,
.
pTableObj
=
pCheckInfo
->
pTableObj
};
info
.
tableId
=
pCheckInfo
->
tableId
;
taosArrayPush
(
pNew
,
&
info
);
}
// it is ordered already, no need to sort again.
taosArraySort
(
pNew
,
tsdbCheckInfoCompar
);
return
pNew
;
}
static
STsdbQueryHandle
*
tsdbQueryTablesImpl
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
calloc
(
1
,
sizeof
(
STsdbQueryHandle
));
if
(
pQueryHandle
==
NULL
)
{
goto
out_of_memory
;
...
...
@@ -205,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
}
tsdbTakeMemSnapshot
(
pQueryHandle
->
pTsdb
,
&
pQueryHandle
->
mem
,
&
pQueryHandle
->
imem
);
size_t
sizeOfGroup
=
taosArrayGetSize
(
groupList
->
pGroupList
);
assert
(
sizeOfGroup
>=
1
&&
pCond
!=
NULL
&&
pCond
->
numOfCols
>
0
);
assert
(
pCond
!=
NULL
&&
pCond
->
numOfCols
>
0
);
if
(
ASCENDING_TRAVERSE
(
pCond
->
order
))
{
assert
(
pQueryHandle
->
window
.
skey
<=
pQueryHandle
->
window
.
ekey
);
...
...
@@ -216,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
}
// allocate buffer in order to load data blocks from file
int32_t
numOfCols
=
pCond
->
numOfCols
;
pQueryHandle
->
statis
=
calloc
(
numOfCols
,
sizeof
(
SDataStatis
));
pQueryHandle
->
statis
=
calloc
(
pCond
->
numOfCols
,
sizeof
(
SDataStatis
));
if
(
pQueryHandle
->
statis
==
NULL
)
{
goto
out_of_memory
;
}
pQueryHandle
->
pColumns
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
// todo: use list instead of array?
pQueryHandle
->
pColumns
=
taosArrayInit
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfoData
));
// todo: use list instead of array?
if
(
pQueryHandle
->
pColumns
==
NULL
)
{
goto
out_of_memory
;
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{{
0
},
0
};
colInfo
.
info
=
pCond
->
colList
[
i
];
colInfo
.
pData
=
calloc
(
1
,
EXTRA_BYTES
+
pQueryHandle
->
outputCapacity
*
pCond
->
colList
[
i
].
bytes
);
if
(
colInfo
.
pData
==
NULL
)
{
...
...
@@ -240,61 +302,47 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle
->
statis
[
i
].
colId
=
colInfo
.
info
.
colId
;
}
pQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
groupList
->
numOfTables
,
sizeof
(
STableCheckInfo
));
if
(
pQueryHandle
->
pTableCheckInfo
==
NULL
)
{
goto
out_of_memory
;
}
pQueryHandle
->
defaultLoadColumn
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
tsdb
);
assert
(
pMeta
!=
NULL
&&
sizeOfGroup
>=
1
&&
pCond
!=
NULL
&&
pCond
->
numOfCols
>
0
);
// todo apply the lastkey of table check to avoid to load header file
for
(
int32_t
i
=
0
;
i
<
sizeOfGroup
;
++
i
)
{
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
groupList
->
pGroupList
,
i
);
size_t
gsize
=
taosArrayGetSize
(
group
);
assert
(
gsize
>
0
);
for
(
int32_t
j
=
0
;
j
<
gsize
;
++
j
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
pTableObj
=
pKeyInfo
->
pTable
};
info
.
tableId
=
((
STable
*
)(
pKeyInfo
->
pTable
))
->
tableId
;
assert
(
pMeta
!=
NULL
);
assert
(
info
.
pTableObj
!=
NULL
&&
(
info
.
pTableObj
->
type
==
TSDB_NORMAL_TABLE
||
info
.
pTableObj
->
type
==
TSDB_CHILD_TABLE
||
info
.
pTableObj
->
type
==
TSDB_STREAM_TABLE
));
info
.
tableId
.
tid
=
info
.
pTableObj
->
tableId
.
tid
;
info
.
tableId
.
uid
=
info
.
pTableObj
->
tableId
.
uid
;
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
assert
(
info
.
lastKey
>=
pQueryHandle
->
window
.
skey
);
}
else
{
assert
(
info
.
lastKey
<=
pQueryHandle
->
window
.
skey
);
}
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
tsdbDebug
(
"%p check table uid:%"
PRId64
", tid:%d from lastKey:%"
PRId64
" %p"
,
pQueryHandle
,
info
.
tableId
.
uid
,
info
.
tableId
.
tid
,
info
.
lastKey
,
qinfo
);
}
pQueryHandle
->
pDataCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pQueryHandle
->
pTsdb
->
config
.
maxRowsPerFileBlock
);
if
(
pQueryHandle
->
pDataCols
==
NULL
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, %p"
,
pQueryHandle
,
pQueryHandle
->
qinfo
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
out_of_memory
;
}
taosArraySort
(
pQueryHandle
->
pTableCheckInfo
,
tsdbCheckInfoCompar
);
pQueryHandle
->
defaultLoadColumn
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in query, %p"
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
),
pQueryHandle
->
qinfo
);
tsdbInitDataBlockLoadInfo
(
&
pQueryHandle
->
dataBlockLoadInfo
);
tsdbInitCompBlockLoadInfo
(
&
pQueryHandle
->
compBlockLoadInfo
);
return
(
TsdbQueryHandleT
)
pQueryHandle
;
out_of_memory:
out_of_memory:
tsdbCleanupQueryHandle
(
pQueryHandle
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
TsdbQueryHandleT
*
tsdbQueryTables
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
tsdbQueryTablesImpl
(
tsdb
,
pCond
,
qinfo
);
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
tsdb
);
assert
(
pMeta
!=
NULL
);
// todo apply the lastkey of table check to avoid to load header file
pQueryHandle
->
pTableCheckInfo
=
createCheckInfoFromTableGroup
(
pQueryHandle
,
groupList
,
pMeta
);
if
(
pQueryHandle
->
pTableCheckInfo
==
NULL
)
{
tsdbCleanupQueryHandle
(
pQueryHandle
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
tsdbDebug
(
"%p total numOfTable:%"
PRIzu
" in query, %p"
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
),
pQueryHandle
->
qinfo
);
return
(
TsdbQueryHandleT
)
pQueryHandle
;
}
TsdbQueryHandleT
tsdbQueryLastRow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
pCond
->
twindow
=
changeTableGroupByLastrow
(
groupList
);
...
...
@@ -689,22 +737,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
}
static
int32_t
doLoadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SCompBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
int32_t
slotIndex
)
{
STsdbRepo
*
pRepo
=
pQueryHandle
->
pTsdb
;
int64_t
st
=
taosGetTimestampUs
();
if
(
pCheckInfo
->
pDataCols
==
NULL
)
{
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
pRepo
);
pCheckInfo
->
pDataCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pRepo
->
config
.
maxRowsPerFileBlock
);
if
(
pCheckInfo
->
pDataCols
==
NULL
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, %p"
,
pQueryHandle
,
pQueryHandle
->
qinfo
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_error
;
}
}
int64_t
st
=
taosGetTimestampUs
();
STSchema
*
pSchema
=
tsdbGetTableSchema
(
pCheckInfo
->
pTableObj
);
int32_t
code
=
tdInitDataCols
(
pCheckInfo
->
pDataCols
,
pSchema
);
STSchema
*
pSchema
=
tsdbGetTableSchema
(
pCheckInfo
->
pTableObj
);
int32_t
code
=
tdInitDataCols
(
pQueryHandle
->
pDataCols
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, %p"
,
pQueryHandle
,
pQueryHandle
->
qinfo
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -1924,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_ALL
;
return
true
;
}
else
{
STsdbQueryHandle
*
pSecQueryHandle
=
calloc
(
1
,
sizeof
(
STsdbQueryHandle
));
if
(
pSecQueryHandle
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
STimeWindow
win
=
(
STimeWindow
)
{
pQueryHandle
->
window
.
skey
,
INT64_MAX
};
STsdbQueryCond
cond
=
{
.
order
=
TSDB_ORDER_ASC
,
.
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
)),
.
twindow
=
win
};
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
if
(
cond
.
colList
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
false
;
}
pSecQueryHandle
->
order
=
TSDB_ORDER_ASC
;
pSecQueryHandle
->
window
=
(
STimeWindow
)
{
pQueryHandle
->
window
.
skey
,
INT64_MAX
};
pSecQueryHandle
->
pTsdb
=
pQueryHandle
->
pTsdb
;
pSecQueryHandle
->
type
=
TSDB_QUERY_TYPE_ALL
;
pSecQueryHandle
->
cur
.
fid
=
-
1
;
pSecQueryHandle
->
cur
.
win
=
TSWINDOW_INITIALIZER
;
pSecQueryHandle
->
checkFiles
=
true
;
pSecQueryHandle
->
activeIndex
=
0
;
pSecQueryHandle
->
outputCapacity
=
((
STsdbRepo
*
)
pSecQueryHandle
->
pTsdb
)
->
config
.
maxRowsPerFileBlock
;
if
(
tsdbInitReadHelper
(
&
pSecQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
pSecQueryHandle
->
pTsdb
)
!=
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
free
(
pSecQueryHandle
);
return
false
;
for
(
int32_t
i
=
0
;
i
<
cond
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memcpy
(
&
cond
.
colList
[
i
],
&
pColInfoData
->
info
,
sizeof
(
SColumnInfo
));
}
tsdbTakeMemSnapshot
(
pSecQueryHandle
->
pTsdb
,
&
pSecQueryHandle
->
mem
,
&
pSecQueryHandle
->
imem
);
STsdbQueryHandle
*
pSecQueryHandle
=
tsdbQueryTablesImpl
(
pQueryHandle
->
pTsdb
,
&
cond
,
pQueryHandle
->
qinfo
);
// allocate buffer in order to load data blocks from file
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
));
taosTFree
(
cond
.
colList
);
pSecQueryHandle
->
statis
=
calloc
(
numOfCols
,
sizeof
(
SDataStatis
));
pSecQueryHandle
->
pColumns
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
if
(
pSecQueryHandle
->
statis
==
NULL
||
pSecQueryHandle
->
pColumns
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
pSecQueryHandle
->
pTableCheckInfo
=
createCheckInfoFromCheckInfo
(
pQueryHandle
->
pTableCheckInfo
,
pSecQueryHandle
->
window
.
skey
);
if
(
pSecQueryHandle
->
pTableCheckInfo
==
NULL
)
{
tsdbCleanupQueryHandle
(
pSecQueryHandle
);
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{{
0
},
0
};
SColumnInfoData
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
colInfo
.
info
=
pCol
->
info
;
colInfo
.
pData
=
calloc
(
1
,
EXTRA_BYTES
+
pQueryHandle
->
outputCapacity
*
pCol
->
info
.
bytes
);
if
(
colInfo
.
pData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbCleanupQueryHandle
(
pSecQueryHandle
);
return
false
;
}
taosArrayPush
(
pSecQueryHandle
->
pColumns
,
&
colInfo
);
}
size_t
si
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
pSecQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
si
,
sizeof
(
STableCheckInfo
));
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
pQueryHandle
->
pTsdb
);
assert
(
pMeta
!=
NULL
);
for
(
int32_t
j
=
0
;
j
<
si
;
++
j
)
{
STableCheckInfo
*
pCheckInfo
=
(
STableCheckInfo
*
)
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pSecQueryHandle
->
window
.
skey
,
.
pTableObj
=
pCheckInfo
->
pTableObj
,
};
info
.
tableId
=
pCheckInfo
->
tableId
;
taosArrayPush
(
pSecQueryHandle
->
pTableCheckInfo
,
&
info
);
}
tsdbInitDataBlockLoadInfo
(
&
pSecQueryHandle
->
dataBlockLoadInfo
);
tsdbInitCompBlockLoadInfo
(
&
pSecQueryHandle
->
compBlockLoadInfo
);
pSecQueryHandle
->
defaultLoadColumn
=
taosArrayClone
(
pQueryHandle
->
defaultLoadColumn
);
if
(
!
tsdbNextDataBlock
((
void
*
)
pSecQueryHandle
))
{
tsdbCleanupQueryHandle
(
pSecQueryHandle
);
return
false
;
...
...
@@ -2003,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbRetrieveDataBlockInfo
((
void
*
)
pSecQueryHandle
,
&
blockInfo
);
tsdbRetrieveDataBlock
((
void
*
)
pSecQueryHandle
,
pSecQueryHandle
->
defaultLoadColumn
);
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pSecQueryHandle
));
size_t
si
=
taosArrayGetSize
(
pSecQueryHandle
->
pTableCheckInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memcpy
((
char
*
)
pCol
->
pData
,
(
char
*
)
pCol
->
pData
+
pCol
->
info
.
bytes
*
(
pQueryHandle
->
cur
.
rows
-
1
),
pCol
->
info
.
bytes
);
...
...
@@ -2016,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
SColumnInfoData
*
pTSCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
0
);
// it is ascending order
pQueryHandle
->
cur
.
win
=
(
STimeWindow
){((
TSKEY
*
)
pTSCol
->
pData
)[
0
],
((
TSKEY
*
)
pTSCol
->
pData
)[
1
]}
;
pQueryHandle
->
order
=
TSDB_ORDER_DESC
;
pQueryHandle
->
window
=
pQueryHandle
->
cur
.
win
;
pQueryHandle
->
cur
.
win
=
(
STimeWindow
){((
TSKEY
*
)
pTSCol
->
pData
)[
0
],
((
TSKEY
*
)
pTSCol
->
pData
)[
1
]};
pQueryHandle
->
cur
.
rows
=
2
;
pQueryHandle
->
cur
.
mixBlock
=
true
;
pQueryHandle
->
order
=
TSDB_ORDER_DESC
;
int32_t
step
=
-
1
;
// one step for ascending order traverse
for
(
int32_t
j
=
0
;
j
<
si
;
++
j
)
{
...
...
@@ -2686,8 +2681,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo
*
pTableCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
destroyTableMemIterator
(
pTableCheckInfo
);
tdFreeDataCols
(
pTableCheckInfo
->
pDataCols
);
pTableCheckInfo
->
pDataCols
=
NULL
;
taosTFree
(
pTableCheckInfo
->
pCompInfo
);
}
taosArrayDestroy
(
pQueryHandle
->
pTableCheckInfo
);
...
...
@@ -2711,6 +2704,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tsdbDestroyHelper
(
&
pQueryHandle
->
rhelper
);
tdFreeDataCols
(
pQueryHandle
->
pDataCols
);
pQueryHandle
->
pDataCols
=
NULL
;
SIOCostSummary
*
pCost
=
&
pQueryHandle
->
cost
;
tsdbDebug
(
"%p :io-cost summary: statis-info:%"
PRId64
" us, datablock:%"
PRId64
" us, check data:%"
PRId64
" us, %p"
,
pQueryHandle
,
pCost
->
statisInfoLoadTime
,
pCost
->
blockLoadTime
,
pCost
->
checkForNextTime
,
pQueryHandle
->
qinfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录