Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
230164c6
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
230164c6
编写于
9月 12, 2021
作者:
H
Haojun Liao
提交者:
GitHub
9月 12, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #7871 from taosdata/feature/query
Feature/query
上级
e9e5dc22
c90d48a7
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
214 addition
and
146 deletion
+214
-146
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+8
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-13
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+0
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+3
-6
src/client/src/tscServer.c
src/client/src/tscServer.c
+30
-14
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+15
-16
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+45
-46
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-12
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+9
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+10
-7
src/query/src/qUtil.c
src/query/src/qUtil.c
+77
-13
src/tsdb/inc/tsdbMeta.h
src/tsdb/inc/tsdbMeta.h
+1
-1
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+1
-5
src/util/inc/tlosertree.h
src/util/inc/tlosertree.h
+2
-3
src/util/src/hash.c
src/util/src/hash.c
+5
-3
src/util/src/tarray.c
src/util/src/tarray.c
+2
-1
src/util/src/tlosertree.c
src/util/src/tlosertree.c
+3
-2
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
230164c6
...
...
@@ -92,7 +92,7 @@ typedef struct SMergeTsCtx {
}
SMergeTsCtx
;
typedef
struct
SVgroupTableInfo
{
SVgroup
Info
vgInfo
;
SVgroup
Msg
vgInfo
;
SArray
*
itemList
;
// SArray<STableIdInfo>
}
SVgroupTableInfo
;
...
...
@@ -174,7 +174,9 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool
tscIsInsertData
(
char
*
sqlstr
);
int
tscAllocPayload
(
SSqlCmd
*
pCmd
,
int
size
);
// the memory is not reset in case of fast allocate payload function
int32_t
tscAllocPayloadFast
(
SSqlCmd
*
pCmd
,
size_t
size
);
int32_t
tscAllocPayload
(
SSqlCmd
*
pCmd
,
int
size
);
TAOS_FIELD
tscCreateField
(
int8_t
type
,
const
char
*
name
,
int16_t
bytes
);
...
...
@@ -288,7 +290,11 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo
*
tscVgroupInfoClone
(
SVgroupsInfo
*
pInfo
);
void
*
tscVgroupInfoClear
(
SVgroupsInfo
*
pInfo
);
#if 0
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
#endif
/**
* The create object function must be successful expect for the out of memory issue.
*
...
...
src/client/inc/tsclient.h
浏览文件 @
230164c6
...
...
@@ -234,7 +234,6 @@ typedef struct STableDataBlocks {
typedef
struct
{
STableMeta
*
pTableMeta
;
SArray
*
vgroupIdList
;
// SVgroupsInfo *pVgroupsInfo;
}
STableMetaVgroupInfo
;
typedef
struct
SInsertStatementParam
{
...
...
@@ -286,20 +285,14 @@ typedef struct {
int32_t
resColumnId
;
}
SSqlCmd
;
typedef
struct
SResRec
{
int
numOfRows
;
int
numOfTotal
;
}
SResRec
;
typedef
struct
{
int32_t
numOfRows
;
// num of results in current retrieval
int64_t
numOfRowsGroup
;
// num of results of current group
int64_t
numOfTotal
;
// num of total results
int64_t
numOfClauseTotal
;
// num of total result in current subclause
char
*
pRsp
;
int32_t
rspType
;
int32_t
rspLen
;
uint64_t
qId
;
uint64_t
qId
;
// query id of SQInfo
int64_t
useconds
;
int64_t
offset
;
// offset value from vnode during projection query of stable
int32_t
row
;
...
...
@@ -307,8 +300,6 @@ typedef struct {
int16_t
precision
;
bool
completed
;
int32_t
code
;
int32_t
numOfGroups
;
SResRec
*
pGroupRec
;
char
*
data
;
TAOS_ROW
tsrow
;
TAOS_ROW
urow
;
...
...
@@ -316,8 +307,7 @@ typedef struct {
char
**
buffer
;
// Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex
*
pColumnIndex
;
TAOS_FIELD
*
final
;
SArithmeticSupport
*
pArithSup
;
// support the arithmetic expression calculation on agg functions
TAOS_FIELD
*
final
;
struct
SGlobalMerger
*
pMerger
;
}
SSqlRes
;
...
...
@@ -377,7 +367,6 @@ typedef struct SSqlObj {
tsem_t
rspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
bool
isBind
;
SSubqueryState
subState
;
struct
SSqlObj
**
pSubs
;
...
...
src/client/src/tscPrepare.c
浏览文件 @
230164c6
...
...
@@ -1491,7 +1491,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
pSql
->
isBind
=
true
;
pStmt
->
pSql
=
pSql
;
pStmt
->
last
=
STMT_INIT
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
230164c6
...
...
@@ -8685,7 +8685,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
if
(
p
->
vgroupIdList
!=
NULL
)
{
size_t
s
=
taosArrayGetSize
(
p
->
vgroupIdList
);
size_t
vgroupsz
=
sizeof
(
SVgroup
Info
)
*
s
+
sizeof
(
SVgroupsInfo
);
size_t
vgroupsz
=
sizeof
(
SVgroup
Msg
)
*
s
+
sizeof
(
SVgroupsInfo
);
pTableMetaInfo
->
vgroupList
=
calloc
(
1
,
vgroupsz
);
if
(
pTableMetaInfo
->
vgroupList
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -8700,14 +8700,11 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
taosHashGetClone
(
tscVgroupMap
,
id
,
sizeof
(
*
id
),
NULL
,
&
existVgroupInfo
);
assert
(
existVgroupInfo
.
inUse
>=
0
);
SVgroup
Info
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
j
];
SVgroup
Msg
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
j
];
pVgroup
->
numOfEps
=
existVgroupInfo
.
numOfEps
;
pVgroup
->
vgId
=
existVgroupInfo
.
vgId
;
for
(
int32_t
k
=
0
;
k
<
existVgroupInfo
.
numOfEps
;
++
k
)
{
pVgroup
->
epAddr
[
k
].
port
=
existVgroupInfo
.
ep
[
k
].
port
;
pVgroup
->
epAddr
[
k
].
fqdn
=
strndup
(
existVgroupInfo
.
ep
[
k
].
fqdn
,
TSDB_FQDN_LEN
);
}
memcpy
(
&
pVgroup
->
epAddr
,
&
existVgroupInfo
.
ep
,
sizeof
(
pVgroup
->
epAddr
));
}
}
}
...
...
src/client/src/tscServer.c
浏览文件 @
230164c6
...
...
@@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) {
return
ret
;
}
static
void
tscSetDnodeEpSet
(
SRpcEpSet
*
pEpSet
,
SVgroup
Info
*
pVgroupInfo
)
{
static
void
tscSetDnodeEpSet
(
SRpcEpSet
*
pEpSet
,
SVgroup
Msg
*
pVgroupInfo
)
{
assert
(
pEpSet
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
// Issue the query to one of the vnode among a vgroup randomly.
...
...
@@ -93,6 +93,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
existed
=
true
;
}
}
assert
(
existed
);
}
...
...
@@ -723,7 +724,7 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
assert
(
index
>=
0
);
SVgroup
Info
*
pVgroupInfo
=
NULL
;
SVgroup
Msg
*
pVgroupInfo
=
NULL
;
if
(
pTableMetaInfo
->
vgroupList
&&
pTableMetaInfo
->
vgroupList
->
numOfVgroups
>
0
)
{
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
...
...
@@ -861,8 +862,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
(
*
pMsg
)
+=
sizeof
(
SSqlExpr
);
for
(
int32_t
j
=
0
;
j
<
pExpr
->
numOfParams
;
++
j
)
{
// todo add log
pSqlExpr
->
param
[
j
].
nType
=
hton
s
((
uint16_t
)
pExpr
->
param
[
j
].
nType
);
pSqlExpr
->
param
[
j
].
nLen
=
htons
(
pExpr
->
param
[
j
].
nLen
);
pSqlExpr
->
param
[
j
].
nType
=
hton
l
(
pExpr
->
param
[
j
].
nType
);
pSqlExpr
->
param
[
j
].
nLen
=
htonl
(
pExpr
->
param
[
j
].
nLen
);
if
(
pExpr
->
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
memcpy
((
*
pMsg
),
pExpr
->
param
[
j
].
pz
,
pExpr
->
param
[
j
].
nLen
);
...
...
@@ -880,17 +881,22 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
int
tscBuildQueryMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
NULL
;
STableMeta
*
pTableMeta
=
NULL
;
STableMetaInfo
*
pTableMetaInfo
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
size
=
tscEstimateQueryMsgSize
(
pSql
);
assert
(
size
>
0
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
Fast
(
pCmd
,
size
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
return
TSDB_CODE_TSC_INVALID_OPERATION
;
// todo add test for this
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
SQueryAttr
query
=
{{
0
}};
tscCreateQueryFromQueryInfo
(
pQueryInfo
,
&
query
,
pSql
);
...
...
@@ -941,14 +947,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
pointInterpQuery
=
query
.
pointInterpQuery
;
pQueryMsg
->
needReverseScan
=
query
.
needReverseScan
;
pQueryMsg
->
stateWindow
=
query
.
stateWindow
;
pQueryMsg
->
numOfTags
=
htonl
(
numOfTags
);
pQueryMsg
->
sqlstrLen
=
htonl
(
sqlLen
);
pQueryMsg
->
sw
.
gap
=
htobe64
(
query
.
sw
.
gap
);
pQueryMsg
->
sw
.
primaryColId
=
htonl
(
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
pQueryMsg
->
secondStageOutput
=
htonl
(
query
.
numOfExpr2
);
pQueryMsg
->
numOfOutput
=
htons
((
int16_t
)
query
.
numOfOutput
);
// this is the stage one output column number
pQueryMsg
->
numOfOutput
=
htons
((
int16_t
)
query
.
numOfOutput
);
// this is the stage one output column number
pQueryMsg
->
numOfGroupCols
=
htons
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
pQueryMsg
->
tagNameRelType
=
htons
(
pQueryInfo
->
tagCond
.
relType
);
...
...
@@ -968,7 +973,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
tableCols
[
i
].
type
=
htons
(
pCol
->
type
);
//pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
pQueryMsg
->
tableCols
[
i
].
flist
.
numOfFilters
=
0
;
pQueryMsg
->
tableCols
[
i
].
flist
.
filterInfo
=
0
;
// append the filter information after the basic column information
//serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
}
...
...
@@ -981,6 +986,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
pCond
->
len
;
}
}
else
{
pQueryMsg
->
colCondLen
=
0
;
}
for
(
int32_t
i
=
0
;
i
<
query
.
numOfOutput
;
++
i
)
{
...
...
@@ -1060,6 +1067,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
pCond
->
len
;
}
}
else
{
pQueryMsg
->
tagCondLen
=
0
;
}
if
(
pQueryInfo
->
bufLen
>
0
)
{
...
...
@@ -1089,6 +1098,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
tsBuf
.
tsOrder
=
htonl
(
pQueryInfo
->
tsBuf
->
tsOrder
);
pQueryMsg
->
tsBuf
.
tsLen
=
htonl
(
pQueryMsg
->
tsBuf
.
tsLen
);
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
=
htonl
(
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
);
}
else
{
pQueryMsg
->
tsBuf
.
tsLen
=
0
;
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
=
0
;
}
int32_t
numOfOperator
=
(
int32_t
)
taosArrayGetSize
(
queryOperator
);
...
...
@@ -1126,6 +1138,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
pUdfInfo
->
contLen
;
}
}
else
{
pQueryMsg
->
udfContentOffset
=
0
;
pQueryMsg
->
udfContentLen
=
0
;
}
memcpy
(
pMsg
,
pSql
->
sqlstr
,
sqlLen
);
...
...
@@ -2146,7 +2161,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
*
size
=
(
int32_t
)(
sizeof
(
SVgroupMsg
)
*
pVgroupMsg
->
numOfVgroups
+
sizeof
(
SVgroupsMsg
));
size_t
vgroupsz
=
sizeof
(
SVgroup
Info
)
*
pVgroupMsg
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
size_t
vgroupsz
=
sizeof
(
SVgroup
Msg
)
*
pVgroupMsg
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
SVgroupsInfo
*
pVgroupInfo
=
calloc
(
1
,
vgroupsz
);
assert
(
pVgroupInfo
!=
NULL
);
...
...
@@ -2156,7 +2171,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
}
else
{
for
(
int32_t
j
=
0
;
j
<
pVgroupInfo
->
numOfVgroups
;
++
j
)
{
// just init, no need to lock
SVgroup
Info
*
pVgroup
=
&
pVgroupInfo
->
vgroups
[
j
];
SVgroup
Msg
*
pVgroup
=
&
pVgroupInfo
->
vgroups
[
j
];
SVgroupMsg
*
vmsg
=
&
pVgroupMsg
->
vgroups
[
j
];
vmsg
->
vgId
=
htonl
(
vmsg
->
vgId
);
...
...
@@ -2168,7 +2183,8 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
pVgroup
->
vgId
=
vmsg
->
vgId
;
for
(
int32_t
k
=
0
;
k
<
vmsg
->
numOfEps
;
++
k
)
{
pVgroup
->
epAddr
[
k
].
port
=
vmsg
->
epAddr
[
k
].
port
;
pVgroup
->
epAddr
[
k
].
fqdn
=
strndup
(
vmsg
->
epAddr
[
k
].
fqdn
,
TSDB_FQDN_LEN
);
tstrncpy
(
pVgroup
->
epAddr
[
k
].
fqdn
,
vmsg
->
epAddr
[
k
].
fqdn
,
TSDB_FQDN_LEN
);
// pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
}
doUpdateVgroupInfo
(
pVgroup
->
vgId
,
vmsg
);
...
...
src/client/src/tscSubquery.c
浏览文件 @
230164c6
...
...
@@ -623,13 +623,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t
colId
=
tscGetJoinTagColIdByUid
(
&
pQueryInfo
->
tagCond
,
pTableMetaInfo
->
pTableMeta
->
id
.
uid
);
// set the tag column id for executor to extract correct tag value
#ifndef _TD_NINGSI_60
pExpr
->
base
.
param
[
0
]
=
(
tVariant
)
{.
i64
=
colId
,
.
nType
=
TSDB_DATA_TYPE_BIGINT
,
.
nLen
=
sizeof
(
int64_t
)};
#else
pExpr
->
base
.
param
[
0
].
i64
=
colId
;
pExpr
->
base
.
param
[
0
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
pExpr
->
base
.
param
[
0
].
nLen
=
sizeof
(
int64_t
);
#endif
tVariant
*
pVariant
=
&
pExpr
->
base
.
param
[
0
];
pVariant
->
i64
=
colId
;
pVariant
->
nType
=
TSDB_DATA_TYPE_BIGINT
;
pVariant
->
nLen
=
sizeof
(
int64_t
);
pExpr
->
base
.
numOfParams
=
1
;
}
...
...
@@ -748,10 +747,11 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo
info
=
{{
0
}};
for
(
int32_t
m
=
0
;
m
<
pvg
->
numOfVgroups
;
++
m
)
{
if
(
tt
->
vgId
==
pvg
->
vgroups
[
m
].
vgId
)
{
tscSVgroupInfoCopy
(
&
info
.
vgInfo
,
&
pvg
->
vgroups
[
m
]
);
memcpy
(
&
info
.
vgInfo
,
&
pvg
->
vgroups
[
m
],
sizeof
(
info
.
vgInfo
)
);
break
;
}
}
assert
(
info
.
vgInfo
.
numOfEps
!=
0
);
vgTables
=
taosArrayInit
(
4
,
sizeof
(
STableIdInfo
));
...
...
@@ -2463,7 +2463,7 @@ static void doConcurrentlySendSubQueries(SSqlObj* pSql) {
SSubqueryState
*
pState
=
&
pSql
->
subState
;
// concurrently sent the query requests.
const
int32_t
MAX_REQUEST_PER_TASK
=
8
;
const
int32_t
MAX_REQUEST_PER_TASK
=
4
;
int32_t
numOfTasks
=
(
pState
->
numOfSub
+
MAX_REQUEST_PER_TASK
-
1
)
/
MAX_REQUEST_PER_TASK
;
assert
(
numOfTasks
>=
1
);
...
...
@@ -2550,13 +2550,14 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs
->
pExtMemBuffer
=
pMemoryBuf
;
trs
->
pOrderDescriptor
=
pDesc
;
trs
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
trs
->
localBuffer
=
(
tFilePage
*
)
malloc
(
nBufferSize
+
sizeof
(
tFilePage
));
if
(
trs
->
localBuffer
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s"
,
pSql
->
self
,
i
,
strerror
(
errno
));
tfree
(
trs
);
break
;
}
trs
->
localBuffer
->
num
=
0
;
trs
->
subqueryIndex
=
i
;
trs
->
pParentSql
=
pSql
;
...
...
@@ -2651,7 +2652,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
int32_t
subqueryIndex
=
trsupport
->
subqueryIndex
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
);
SVgroup
Info
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
];
SVgroup
Msg
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
];
tExtMemBufferClear
(
trsupport
->
pExtMemBuffer
[
subqueryIndex
]);
...
...
@@ -2879,7 +2880,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
pParentSql
->
res
.
precision
=
pSql
->
res
.
precision
;
pParentSql
->
res
.
numOfRows
=
0
;
pParentSql
->
res
.
row
=
0
;
pParentSql
->
res
.
numOfGroups
=
0
;
tscFreeRetrieveSup
(
pSql
);
...
...
@@ -2930,7 +2930,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSubqueryState
*
pState
=
&
pParentSql
->
subState
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
);
SVgroup
Info
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
];
SVgroup
Msg
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
0
];
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
...
...
@@ -3058,7 +3058,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert
(
pQueryInfo
->
numOfTables
==
1
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
);
SVgroup
Info
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
trsupport
->
subqueryIndex
];
SVgroup
Msg
*
pVgroup
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
trsupport
->
subqueryIndex
];
// stable query killed or other subquery failed, all query stopped
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -3404,7 +3404,6 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
return
;
}
// tscRestoreFuncForSTableQuery(pQueryInfo);
int32_t
rowSize
=
tscGetResRowLength
(
pQueryInfo
->
exprList
);
assert
(
numOfRes
*
rowSize
>
0
);
...
...
src/client/src/tscUtil.c
浏览文件 @
230164c6
...
...
@@ -1347,14 +1347,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree
(
pRes
->
buffer
);
tfree
(
pRes
->
urow
);
tfree
(
pRes
->
pGroupRec
);
tfree
(
pRes
->
pColumnIndex
);
if
(
pRes
->
pArithSup
!=
NULL
)
{
tfree
(
pRes
->
pArithSup
->
data
);
tfree
(
pRes
->
pArithSup
);
}
tfree
(
pRes
->
final
);
pRes
->
data
=
NULL
;
// pRes->data points to the buffer of pRsp, no need to free
...
...
@@ -2087,32 +2080,35 @@ bool tscIsInsertData(char* sqlstr) {
}
while
(
1
);
}
int
tscAllocPayload
(
SSqlCmd
*
pCmd
,
in
t
size
)
{
int
32_t
tscAllocPayloadFast
(
SSqlCmd
*
pCmd
,
size_
t
size
)
{
if
(
pCmd
->
payload
==
NULL
)
{
assert
(
pCmd
->
allocSize
==
0
);
pCmd
->
payload
=
(
char
*
)
calloc
(
1
,
size
);
if
(
pCmd
->
payload
==
NULL
)
{
pCmd
->
payload
=
malloc
(
size
);
pCmd
->
allocSize
=
(
uint32_t
)
size
;
}
else
if
(
pCmd
->
allocSize
<
size
)
{
char
*
tmp
=
realloc
(
pCmd
->
payload
,
size
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pCmd
->
allocSize
=
size
;
}
else
{
if
(
pCmd
->
allocSize
<
(
uint32_t
)
size
)
{
char
*
b
=
realloc
(
pCmd
->
payload
,
size
);
if
(
b
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pCmd
->
payload
=
tmp
;
pCmd
->
allocSize
=
(
uint32_t
)
size
;
}
pCmd
->
payload
=
b
;
pCmd
->
allocSize
=
size
;
}
assert
(
pCmd
->
allocSize
>=
size
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tscAllocPayload
(
SSqlCmd
*
pCmd
,
int
size
)
{
assert
(
size
>
0
);
int32_t
code
=
tscAllocPayloadFast
(
pCmd
,
(
size_t
)
size
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
memset
(
pCmd
->
payload
,
0
,
pCmd
->
allocSize
);
}
assert
(
pCmd
->
allocSize
>=
(
uint32_t
)
size
&&
size
>
0
);
return
TSDB_CODE_SUCCESS
;
return
code
;
}
TAOS_FIELD
tscCreateField
(
int8_t
type
,
const
char
*
name
,
int16_t
bytes
)
{
...
...
@@ -3369,11 +3365,11 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
size_t
num
=
taosArrayGetSize
(
pVgroupTables
);
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTables
,
i
);
#if 0
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
#endif
taosArrayDestroy
(
pInfo
->
itemList
);
}
...
...
@@ -3387,9 +3383,9 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert
(
size
>
index
);
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTable
,
index
);
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
tfree
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
//
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
//
tfree(pInfo->vgInfo.epAddr[j].fqdn);
//
}
taosArrayDestroy
(
pInfo
->
itemList
);
taosArrayRemove
(
pVgroupTable
,
index
);
...
...
@@ -3399,9 +3395,12 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset
(
info
,
0
,
sizeof
(
SVgroupTableInfo
));
info
->
vgInfo
=
pInfo
->
vgInfo
;
#if 0
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
#endif
if
(
pInfo
->
itemList
)
{
info
->
itemList
=
taosArrayDup
(
pInfo
->
itemList
);
...
...
@@ -3464,13 +3463,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM
}
pTableMetaInfo
->
pTableMeta
=
pTableMeta
;
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
)
{
pTableMetaInfo
->
tableMetaSize
=
0
;
}
else
{
pTableMetaInfo
->
tableMetaSize
=
tscGetTableMetaSize
(
pTableMeta
);
}
pTableMetaInfo
->
tableMetaSize
=
(
pTableMetaInfo
->
pTableMeta
==
NULL
)
?
0
:
tscGetTableMetaSize
(
pTableMeta
);
pTableMetaInfo
->
tableMetaCapacity
=
(
size_t
)(
pTableMetaInfo
->
tableMetaSize
);
if
(
vgroupList
!=
NULL
)
{
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClone
(
vgroupList
);
...
...
@@ -3718,8 +3713,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
pNewQueryInfo
->
numOfFillVal
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
pNewQueryInfo
->
numOfFillVal
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
memcpy
(
pNewQueryInfo
->
fillVal
,
pQueryInfo
->
fillVal
,
pQueryInfo
->
fieldsInfo
.
numOfOutput
*
sizeof
(
int64_t
));
}
...
...
@@ -3760,7 +3755,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
&
pTableMetaInfo
->
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
else
{
// transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo
*
pPrevInfo
=
tscGetTableMetaInfoFromCmd
(
&
pPrevSql
->
cmd
,
0
);
if
(
pPrevInfo
->
pTableMeta
&&
pPrevInfo
->
pTableMeta
->
tableType
<
0
)
{
...
...
@@ -3770,8 +3764,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
STableMeta
*
pPrevTableMeta
=
tscTableMetaDup
(
pPrevInfo
->
pTableMeta
);
SVgroupsInfo
*
pVgroupsInfo
=
pPrevInfo
->
vgroupList
;
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
&
pTableMetaInfo
->
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
&
pTableMetaInfo
->
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
// this case cannot be happened
...
...
@@ -4415,8 +4409,8 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
return
NULL
;
}
size_t
size
=
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroup
Info
)
*
vgroupList
->
numOfVgroups
;
SVgroupsInfo
*
pNew
=
calloc
(
1
,
size
);
size_t
size
=
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroup
Msg
)
*
vgroupList
->
numOfVgroups
;
SVgroupsInfo
*
pNew
=
malloc
(
size
);
if
(
pNew
==
NULL
)
{
return
NULL
;
}
...
...
@@ -4424,15 +4418,15 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNew
->
numOfVgroups
=
vgroupList
->
numOfVgroups
;
for
(
int32_t
i
=
0
;
i
<
vgroupList
->
numOfVgroups
;
++
i
)
{
SVgroup
Info
*
pNewVInfo
=
&
pNew
->
vgroups
[
i
];
SVgroup
Msg
*
pNewVInfo
=
&
pNew
->
vgroups
[
i
];
SVgroup
Info
*
pvInfo
=
&
vgroupList
->
vgroups
[
i
];
SVgroup
Msg
*
pvInfo
=
&
vgroupList
->
vgroups
[
i
];
pNewVInfo
->
vgId
=
pvInfo
->
vgId
;
pNewVInfo
->
numOfEps
=
pvInfo
->
numOfEps
;
for
(
int32_t
j
=
0
;
j
<
pvInfo
->
numOfEps
;
++
j
)
{
pNewVInfo
->
epAddr
[
j
].
fqdn
=
strdup
(
pvInfo
->
epAddr
[
j
].
fqdn
);
pNewVInfo
->
epAddr
[
j
].
port
=
pvInfo
->
epAddr
[
j
].
port
;
tstrncpy
(
pNewVInfo
->
epAddr
[
j
].
fqdn
,
pvInfo
->
epAddr
[
j
].
fqdn
,
TSDB_FQDN_LEN
);
}
}
...
...
@@ -4444,8 +4438,9 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return
NULL
;
}
#if 0
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SVgroup
Info
*
pVgroupInfo
=
&
vgroupList
->
vgroups
[
i
];
SVgroup
Msg
* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
tfree(pVgroupInfo->epAddr[j].fqdn);
...
...
@@ -4456,10 +4451,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
}
}
#endif
tfree
(
vgroupList
);
return
NULL
;
}
# if 0
void
tscSVgroupInfoCopy
(
SVgroupInfo
*
dst
,
const
SVgroupInfo
*
src
)
{
dst
->
vgId
=
src
->
vgId
;
dst
->
numOfEps
=
src
->
numOfEps
;
...
...
@@ -4472,6 +4468,8 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
}
}
#endif
char
*
serializeTagData
(
STagData
*
pTagData
,
char
*
pMsg
)
{
int32_t
n
=
(
int32_t
)
strlen
(
pTagData
->
name
);
*
(
int32_t
*
)
pMsg
=
htonl
(
n
);
...
...
@@ -4612,11 +4610,12 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
SVgroupsInfo
*
tscVgroupsInfoDup
(
SVgroupsInfo
*
pVgroupsInfo
)
{
assert
(
pVgroupsInfo
!=
NULL
);
size_t
size
=
sizeof
(
SVgroup
Info
)
*
pVgroupsInfo
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
size_t
size
=
sizeof
(
SVgroup
Msg
)
*
pVgroupsInfo
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
SVgroupsInfo
*
pInfo
=
calloc
(
1
,
size
);
pInfo
->
numOfVgroups
=
pVgroupsInfo
->
numOfVgroups
;
for
(
int32_t
m
=
0
;
m
<
pVgroupsInfo
->
numOfVgroups
;
++
m
)
{
tscSVgroupInfoCopy
(
&
pInfo
->
vgroups
[
m
],
&
pVgroupsInfo
->
vgroups
[
m
]);
memcpy
(
&
pInfo
->
vgroups
[
m
],
&
pVgroupsInfo
->
vgroups
[
m
],
sizeof
(
SVgroupMsg
));
// tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]);
}
return
pInfo
;
}
...
...
src/inc/taosmsg.h
浏览文件 @
230164c6
...
...
@@ -766,27 +766,16 @@ typedef struct SSTableVgroupMsg {
int32_t
numOfTables
;
}
SSTableVgroupMsg
,
SSTableVgroupRspMsg
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr1
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupMsg
;
typedef
struct
{
int32_t
numOfVgroups
;
SVgroupInfo
vgroups
[];
}
SVgroupsInfo
;
typedef
struct
{
int32_t
numOfVgroups
;
SVgroupMsg
vgroups
[];
}
SVgroupsMsg
;
}
SVgroupsMsg
,
SVgroupsInfo
;
typedef
struct
STableMetaMsg
{
int32_t
contLen
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
230164c6
...
...
@@ -86,11 +86,18 @@ typedef struct SResultRow {
char
*
key
;
// start key of current result row
}
SResultRow
;
typedef
struct
SResultRowCell
{
uint64_t
groupId
;
SResultRow
*
pRow
;
}
SResultRowCell
;
typedef
struct
SGroupResInfo
{
int32_t
totalGroup
;
int32_t
currentGroup
;
int32_t
index
;
SArray
*
pRows
;
// SArray<SResultRow*>
bool
ordered
;
int32_t
position
;
}
SGroupResInfo
;
/**
...
...
@@ -284,8 +291,9 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
SResultRowPool
*
pool
;
//
window result object pool
SResultRowPool
*
pool
;
//
The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char
**
prevRow
;
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
...
...
src/query/src/qExecutor.c
浏览文件 @
230164c6
...
...
@@ -544,6 +544,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
// add a new result set for a new group
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pResult
,
POINTER_BYTES
);
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pRow
=
pResult
};
taosArrayPush
(
pRuntimeEnv
->
pResultRowArrayList
,
&
cell
);
}
else
{
pResult
=
*
p1
;
}
...
...
@@ -2107,9 +2109,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv
->
pQueryAttr
=
pQueryAttr
;
pRuntimeEnv
->
pResultRowHashTable
=
taosHashInit
(
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pRuntimeEnv
->
pResultRowListSet
=
taosHashInit
(
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pRuntimeEnv
->
pResultRowListSet
=
taosHashInit
(
numOfTables
*
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pRuntimeEnv
->
keyBuf
=
malloc
(
pQueryAttr
->
maxTableColumnWidth
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
pRuntimeEnv
->
pool
=
initResultRowPool
(
getResultRowSize
(
pRuntimeEnv
));
pRuntimeEnv
->
pResultRowArrayList
=
taosArrayInit
(
numOfTables
,
sizeof
(
SResultRowCell
));
pRuntimeEnv
->
prevRow
=
malloc
(
POINTER_BYTES
*
pQueryAttr
->
numOfCols
+
pQueryAttr
->
srcRowSize
);
pRuntimeEnv
->
tagVal
=
malloc
(
pQueryAttr
->
tagLen
);
...
...
@@ -2384,6 +2387,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv
->
pool
=
destroyResultRowPool
(
pRuntimeEnv
->
pool
);
taosArrayDestroyEx
(
pRuntimeEnv
->
prevResult
,
freeInterResult
);
taosArrayDestroy
(
pRuntimeEnv
->
pResultRowArrayList
);
pRuntimeEnv
->
prevResult
=
NULL
;
}
...
...
@@ -4808,7 +4812,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pQueryAttr
->
tsdb
=
tsdb
;
if
(
tsdb
!=
NULL
)
{
int32_t
code
=
setupQueryHandle
(
tsdb
,
pRuntimeEnv
,
pQInfo
->
qId
,
pQueryAttr
->
stableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -6379,6 +6382,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
if
(
!
pRuntimeEnv
->
pQueryAttr
->
stableQuery
)
{
sortGroupResByOrderList
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pInfo
->
binfo
.
pRes
);
}
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pInfo
->
binfo
.
pRes
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
...
...
@@ -7600,8 +7604,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg
+=
sizeof
(
SSqlExpr
);
for
(
int32_t
j
=
0
;
j
<
pExprMsg
->
numOfParams
;
++
j
)
{
pExprMsg
->
param
[
j
].
nType
=
hton
s
(
pExprMsg
->
param
[
j
].
nType
);
pExprMsg
->
param
[
j
].
nLen
=
htons
(
pExprMsg
->
param
[
j
].
nLen
);
pExprMsg
->
param
[
j
].
nType
=
hton
l
(
pExprMsg
->
param
[
j
].
nType
);
pExprMsg
->
param
[
j
].
nLen
=
htonl
(
pExprMsg
->
param
[
j
].
nLen
);
if
(
pExprMsg
->
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
pExprMsg
->
param
[
j
].
pz
=
pMsg
;
...
...
@@ -7648,8 +7652,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg
+=
sizeof
(
SSqlExpr
);
for
(
int32_t
j
=
0
;
j
<
pExprMsg
->
numOfParams
;
++
j
)
{
pExprMsg
->
param
[
j
].
nType
=
hton
s
(
pExprMsg
->
param
[
j
].
nType
);
pExprMsg
->
param
[
j
].
nLen
=
hton
s
(
pExprMsg
->
param
[
j
].
nLen
);
pExprMsg
->
param
[
j
].
nType
=
hton
l
(
pExprMsg
->
param
[
j
].
nType
);
pExprMsg
->
param
[
j
].
nLen
=
hton
l
(
pExprMsg
->
param
[
j
].
nLen
);
if
(
pExprMsg
->
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
pExprMsg
->
param
[
j
].
pz
=
pMsg
;
...
...
@@ -8648,7 +8652,6 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
SArray
*
prevResult
=
NULL
;
if
(
prevResultLen
>
0
)
{
prevResult
=
interResFromBinary
(
param
->
prevResult
,
prevResultLen
);
pRuntimeEnv
->
prevResult
=
prevResult
;
}
...
...
src/query/src/qUtil.c
浏览文件 @
230164c6
...
...
@@ -436,13 +436,13 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
STableQueryInfo
**
pList
=
supporter
->
pTableQueryInfo
;
SResultRowInfo
*
pWindowResInfo1
=
&
(
pList
[
left
]
->
resInfo
);
SResultRow
*
pWindowRes1
=
getResultRow
(
pWindowResInfo1
,
leftPos
);
SResultRow
*
pWindowRes1
=
pList
[
left
]
->
resInfo
.
pResult
[
leftPos
];
// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos);
TSKEY
leftTimestamp
=
pWindowRes1
->
win
.
skey
;
SResultRowInfo
*
pWindowResInfo2
=
&
(
pList
[
right
]
->
resInfo
);
SResultRow
*
pWindowRes2
=
getResultRow
(
pWindowResInfo2
,
rightPos
);
// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
SResultRow
*
pWindowRes2
=
pList
[
right
]
->
resInfo
.
pResult
[
rightPos
];
TSKEY
rightTimestamp
=
pWindowRes2
->
win
.
skey
;
if
(
leftTimestamp
==
rightTimestamp
)
{
...
...
@@ -456,7 +456,77 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
}
static
int32_t
mergeIntoGroupResultImpl
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
int32_t
tsAscOrder
(
const
void
*
p1
,
const
void
*
p2
)
{
SResultRowCell
*
pc1
=
(
SResultRowCell
*
)
p1
;
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
return
0
;
}
else
{
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
-
1
:
1
;
}
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
}
int32_t
tsDescOrder
(
const
void
*
p1
,
const
void
*
p2
)
{
SResultRowCell
*
pc1
=
(
SResultRowCell
*
)
p1
;
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
return
0
;
}
else
{
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
1
:-
1
;
}
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
}
void
orderTheResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
__compar_fn_t
fn
=
NULL
;
if
(
pRuntimeEnv
->
pQueryAttr
->
order
.
order
==
TSDB_ORDER_ASC
)
{
fn
=
tsAscOrder
;
}
else
{
fn
=
tsDescOrder
;
}
taosArraySort
(
pRuntimeEnv
->
pResultRowArrayList
,
fn
);
}
static
int32_t
mergeIntoGroupResultImplRv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
uint64_t
groupId
,
int32_t
*
rowCellInfoOffset
)
{
if
(
!
pGroupResInfo
->
ordered
)
{
orderTheResultRows
(
pRuntimeEnv
);
pGroupResInfo
->
ordered
=
true
;
}
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
pGroupResInfo
->
pRows
=
taosArrayInit
(
100
,
POINTER_BYTES
);
}
size_t
len
=
taosArrayGetSize
(
pRuntimeEnv
->
pResultRowArrayList
);
for
(;
pGroupResInfo
->
position
<
len
;
++
pGroupResInfo
->
position
)
{
SResultRowCell
*
pResultRowCell
=
taosArrayGet
(
pRuntimeEnv
->
pResultRowArrayList
,
pGroupResInfo
->
position
);
if
(
pResultRowCell
->
groupId
!=
groupId
)
{
break
;
}
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pResultRowCell
->
pRow
,
rowCellInfoOffset
);
if
(
num
<=
0
)
{
continue
;
}
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
pRow
);
pResultRowCell
->
pRow
->
numOfRows
=
(
uint32_t
)
num
;
}
return
TSDB_CODE_SUCCESS
;
}
static
UNUSED_FUNC
int32_t
mergeIntoGroupResultImpl
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
int32_t
*
rowCellInfoOffset
)
{
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pRuntimeEnv
->
pQueryAttr
);
...
...
@@ -562,12 +632,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
int64_t
st
=
taosGetTimestampUs
();
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pRuntimeEnv
,
pGroupResInfo
->
currentGroup
);
int32_t
ret
=
mergeIntoGroupResultImpl
(
pRuntimeEnv
,
pGroupResInfo
,
group
,
offset
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
mergeIntoGroupResultImplRv
(
pRuntimeEnv
,
pGroupResInfo
,
pGroupResInfo
->
currentGroup
,
offset
);
// this group generates at least one result, return results
if
(
taosArrayGetSize
(
pGroupResInfo
->
pRows
)
>
0
)
{
...
...
@@ -583,7 +648,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
qDebug
(
"QInfo:%"
PRIu64
" merge res data into group, index:%d, total group:%d, elapsed time:%"
PRId64
"us"
,
GET_QID
(
pRuntimeEnv
),
pGroupResInfo
->
currentGroup
,
pGroupResInfo
->
totalGroup
,
elapsedTime
);
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return
TSDB_CODE_SUCCESS
;
}
...
...
src/tsdb/inc/tsdbMeta.h
浏览文件 @
230164c6
...
...
@@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int16_t
_version
)
{
STable
*
pDTable
=
(
TABLE_TYPE
(
pTable
)
==
TSDB_CHILD_TABLE
)
?
pTable
->
pSuper
:
pTable
;
STable
*
pDTable
=
(
pTable
->
pSuper
!=
NULL
)
?
pTable
->
pSuper
:
pTable
;
// for performance purpose
STSchema
*
pSchema
=
NULL
;
STSchema
*
pTSchema
=
NULL
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
230164c6
...
...
@@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
group
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pKeyInfo
->
lastKey
,
.
pTableObj
=
pKeyInfo
->
pTable
};
info
.
tableId
=
((
STable
*
)(
pKeyInfo
->
pTable
))
->
tableId
;
assert
(
info
.
pTableObj
!=
NULL
&&
(
info
.
pTableObj
->
type
==
TSDB_NORMAL_TABLE
||
info
.
pTableObj
->
type
==
TSDB_CHILD_TABLE
||
info
.
pTableObj
->
type
==
TSDB_STREAM_TABLE
));
...
...
@@ -2218,7 +2216,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
SBlock
*
pBlock
=
pTableCheck
->
pCompInfo
->
blocks
;
sup
.
numOfBlocksPerTable
[
numOfQualTables
]
=
pTableCheck
->
numOfBlocks
;
char
*
buf
=
calloc
(
1
,
sizeof
(
STableBlockInfo
)
*
pTableCheck
->
numOfBlocks
);
char
*
buf
=
malloc
(
sizeof
(
STableBlockInfo
)
*
pTableCheck
->
numOfBlocks
);
if
(
buf
==
NULL
)
{
cleanBlockOrderSupporter
(
&
sup
,
numOfQualTables
);
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -3618,8 +3616,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
taosArrayGet
(
pTableList
,
i
);
assert
(((
STable
*
)
pKeyInfo
->
pTable
)
->
type
==
TSDB_CHILD_TABLE
);
tsdbRefTable
(
pKeyInfo
->
pTable
);
STableKeyInfo
info
=
{.
pTable
=
pKeyInfo
->
pTable
,
.
lastKey
=
skey
};
...
...
src/util/inc/tlosertree.h
浏览文件 @
230164c6
...
...
@@ -26,7 +26,7 @@ typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef
struct
SLoserTreeNode
{
int32_t
index
;
void
*
pData
;
void
*
pData
;
}
SLoserTreeNode
;
typedef
struct
SLoserTreeInfo
{
...
...
@@ -34,8 +34,7 @@ typedef struct SLoserTreeInfo {
int32_t
totalEntries
;
__merge_compare_fn_t
comparFn
;
void
*
param
;
SLoserTreeNode
*
pNode
;
SLoserTreeNode
*
pNode
;
}
SLoserTreeInfo
;
uint32_t
tLoserTreeCreate
(
SLoserTreeInfo
**
pTree
,
int32_t
numOfEntries
,
void
*
param
,
__merge_compare_fn_t
compareFn
);
...
...
src/util/src/hash.c
浏览文件 @
230164c6
...
...
@@ -741,17 +741,19 @@ void taosHashTableResize(SHashObj *pHashObj) {
}
SHashNode
*
doCreateHashNode
(
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dsize
,
uint32_t
hashVal
)
{
SHashNode
*
pNewNode
=
calloc
(
1
,
sizeof
(
SHashNode
)
+
keyLen
+
dsize
);
SHashNode
*
pNewNode
=
malloc
(
sizeof
(
SHashNode
)
+
keyLen
+
dsize
);
if
(
pNewNode
==
NULL
)
{
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
pNewNode
->
keyLen
=
(
uint32_t
)
keyLen
;
pNewNode
->
keyLen
=
(
uint32_t
)
keyLen
;
pNewNode
->
hashVal
=
hashVal
;
pNewNode
->
dataLen
=
(
uint32_t
)
dsize
;
pNewNode
->
count
=
1
;
pNewNode
->
count
=
1
;
pNewNode
->
removed
=
0
;
pNewNode
->
next
=
NULL
;
memcpy
(
GET_HASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
memcpy
(
GET_HASH_NODE_KEY
(
pNewNode
),
key
,
keyLen
);
...
...
src/util/src/tarray.c
浏览文件 @
230164c6
...
...
@@ -24,11 +24,12 @@ void* taosArrayInit(size_t size, size_t elemSize) {
size
=
TARRAY_MIN_SIZE
;
}
SArray
*
pArray
=
calloc
(
1
,
sizeof
(
SArray
));
SArray
*
pArray
=
malloc
(
sizeof
(
SArray
));
if
(
pArray
==
NULL
)
{
return
NULL
;
}
pArray
->
size
=
0
;
pArray
->
pData
=
calloc
(
size
,
elemSize
);
if
(
pArray
->
pData
==
NULL
)
{
free
(
pArray
);
...
...
src/util/src/tlosertree.c
浏览文件 @
230164c6
...
...
@@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
SLoserTreeNode
kLeaf
=
pTree
->
pNode
[
idx
];
while
(
parentId
>
0
)
{
if
(
pTree
->
pNode
[
parentId
].
index
==
-
1
)
{
SLoserTreeNode
*
pCur
=
&
pTree
->
pNode
[
parentId
];
if
(
pCur
->
index
==
-
1
)
{
pTree
->
pNode
[
parentId
]
=
kLeaf
;
return
;
}
int32_t
ret
=
pTree
->
comparFn
(
&
pTree
->
pNode
[
parentId
]
,
&
kLeaf
,
pTree
->
param
);
int32_t
ret
=
pTree
->
comparFn
(
pCur
,
&
kLeaf
,
pTree
->
param
);
if
(
ret
<
0
)
{
SLoserTreeNode
t
=
pTree
->
pNode
[
parentId
];
pTree
->
pNode
[
parentId
]
=
kLeaf
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录