Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
62127398
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
62127398
编写于
12月 24, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2378]<enhance>: optimize the client memory requirement.
上级
172f11ca
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
59 addition
and
281 deletion
+59
-281
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-1
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+0
-1
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-1
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+11
-19
src/client/src/tscServer.c
src/client/src/tscServer.c
+28
-63
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+13
-33
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+0
-4
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+3
-4
src/query/src/qUtil.c
src/query/src/qUtil.c
+0
-154
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
62127398
...
...
@@ -69,9 +69,10 @@ typedef struct STableMeta {
int16_t
sversion
;
int16_t
tversion
;
char
sTableId
[
TSDB_TABLE_FNAME_LEN
];
SVgroupInfo
vgroupInfo
;
int32_t
vgId
;
SCorVgroupInfo
corVgroupInfo
;
STableId
id
;
// union {int64_t stableUid; SSchema* schema;};
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
}
STableMeta
;
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
62127398
...
...
@@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
pInfo
->
pHisto
=
tHistogramCreateFrom
(
tmp
,
MAX_HISTOGRAM_BIN
);
printf
(
"%p, %p
\n
"
,
pInfo
->
pHisto
,
pInfo
->
pHisto
->
elems
);
return
true
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
62127398
...
...
@@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
return
code
;
}
dataBuf
->
vgId
=
pTableMeta
->
vg
roupInfo
.
vg
Id
;
dataBuf
->
vgId
=
pTableMeta
->
vgId
;
dataBuf
->
numOfTables
=
1
;
*
totalNum
+=
numOfRows
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
62127398
...
...
@@ -4936,7 +4936,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
id
.
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
62127398
...
...
@@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
return
NULL
;
}
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Info
*
vgroupInfo
)
{
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Msg
*
pVgroupMsg
)
{
corVgroupInfo
->
version
=
0
;
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
vgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
corVgroupInfo
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strdup
(
vgroupInfo
->
epAddr
[
i
].
fqdn
);
corVgroupInfo
->
epAddr
[
i
].
port
=
vgroupInfo
->
epAddr
[
i
].
port
;
corVgroupInfo
->
numOfEps
=
pVgroupMsg
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupMsg
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pVgroupMsg
->
epAddr
[
i
].
fqdn
,
tListLen
(
pVgroupMsg
->
epAddr
[
0
].
fqdn
));
corVgroupInfo
->
epAddr
[
i
].
port
=
pVgroupMsg
->
epAddr
[
i
].
port
;
}
}
...
...
@@ -145,7 +146,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t
schemaSize
=
(
pTableMetaMsg
->
numOfColumns
+
pTableMetaMsg
->
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
schemaSize
);
pTableMeta
->
tableType
=
pTableMetaMsg
->
tableType
;
pTableMeta
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
pTableMeta
->
tableInfo
=
(
STableComInfo
)
{
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
...
...
@@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
tid
;
pTableMeta
->
id
.
uid
=
pTableMetaMsg
->
uid
;
SVgroupInfo
*
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
pVgroupInfo
->
numOfEps
=
pTableMetaMsg
->
vgroup
.
numOfEps
;
pVgroupInfo
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
SEpAddrMsg
*
pEpMsg
=
&
pTableMetaMsg
->
vgroup
.
epAddr
[
i
];
pVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pEpMsg
->
fqdn
,
tListLen
(
pEpMsg
->
fqdn
));
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpMsg
->
port
;
}
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
pVgroupInfo
);
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
&
pTableMetaMsg
->
vgroup
);
pTableMeta
->
sversion
=
pTableMetaMsg
->
sversion
;
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
...
...
src/client/src/tscServer.c
浏览文件 @
62127398
...
...
@@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return
0
;
}
return
initial
*
(
2
<<
(
count
-
2
));
return
initial
*
(
(
2u
)
<<
(
count
-
2
));
}
static
void
tscSetDnodeEpSet
(
SSqlObj
*
pSql
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pSql
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
SRpcEpSet
*
pEpSet
=
&
pSql
->
epSet
;
static
void
tscSetDnodeEpSet
(
SRpcEpSet
*
pEpSet
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pEpSet
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
// Issue the query to one of the vnode among a vgroup randomly.
// change the inUse property would not affect the isUse attribute of STableMeta
pEpSet
->
inUse
=
rand
()
%
pVgroupInfo
->
numOfEps
;
// apply the FQDN string length check here
bool
hasFqdn
=
false
;
bool
existed
=
false
;
pEpSet
->
numOfEps
=
pVgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
pEpSet
->
port
[
i
]
=
pVgroupInfo
->
epAddr
[
i
].
port
;
if
(
!
hasFqdn
)
{
hasFqdn
=
(
strlen
(
pEpSet
->
fqdn
[
i
])
>
0
);
int32_t
len
=
(
int32_t
)
strnlen
(
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
len
>
0
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
existed
=
true
;
}
}
assert
(
hasFqdn
);
assert
(
existed
);
}
static
void
tscDumpMgmtEpSet
(
SSqlObj
*
pSql
)
{
...
...
@@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
pCorEpSet
->
epSet
=
*
pEpSet
;
taosCorEndWrite
(
&
pCorEpSet
->
version
);
}
static
void
tscDumpEpSetFromVgroupInfo
(
SCorVgroupInfo
*
pVgroupInfo
,
SRpcEpSet
*
pEpSet
)
{
static
void
tscDumpEpSetFromVgroupInfo
(
SRpcEpSet
*
pEpSet
,
SCorVgroupInfo
*
pVgroupInfo
)
{
if
(
pVgroupInfo
==
NULL
)
{
return
;}
taosCorBeginRead
(
&
pVgroupInfo
->
version
);
int8_t
inUse
=
pVgroupInfo
->
inUse
;
...
...
@@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vgId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pSql
->
cmd
.
payloadLen
-
sizeof
(
SMsgDesc
);
int32_t
vgId
=
pTableMeta
->
vgroupInfo
.
vgId
;
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pMsg
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always one vnode
...
...
@@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
sizeof
(
SMsgDesc
);
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)
pMsg
;
pShellMsg
->
header
.
vgId
=
htonl
(
vgId
);
pShellMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
...
...
@@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
tscDumpEpSetFromVgroupInfo
(
&
p
TableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
epSet
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
)
||
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
SVgroupInfo
*
pVgroupInfo
=
NULL
;
int32_t
vgId
=
-
1
;
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
assert
(
index
>=
0
);
SVgroupInfo
*
pVgroupInfo
=
NULL
;
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
>
0
)
{
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
}
vgId
=
pVgroupInfo
->
vgId
;
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
pVgroupInfo
);
tscDebug
(
"%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
}
else
{
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
vgId
=
pTableMeta
->
vgId
;
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
}
assert
(
pVgroupInfo
!=
NULL
)
;
pSql
->
epSet
.
inUse
=
rand
()
%
pSql
->
epSet
.
numOfEps
;
tscSetDnodeEpSet
(
pSql
,
pVgroupInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
pQueryMsg
->
head
.
vgId
=
htonl
(
vgId
);
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
...
...
@@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
index
);
// set the vgroup info
tscSetDnodeEpSet
(
pSql
,
&
pTableIdList
->
vgInfo
);
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
&
pTableIdList
->
vgInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pTableIdList
->
vgInfo
.
vgId
);
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTableIdList
->
itemList
);
...
...
@@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tscDumpEpSetFromVgroupInfo
(
&
p
TableMetaInfo
->
pTableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMetaInfo
->
pTableMeta
->
corVgroupInfo
);
return
TSDB_CODE_SUCCESS
;
}
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int
tscAlterDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SAlterDbMsg
);
...
...
src/client/src/tscUtil.c
浏览文件 @
62127398
...
...
@@ -466,13 +466,6 @@ void tscFreeRegisteredSqlObj(void *pSql) {
void
tscFreeTableMetaHelper
(
void
*
pTableMeta
)
{
STableMeta
*
p
=
(
STableMeta
*
)
pTableMeta
;
int32_t
numOfEps
=
p
->
vgroupInfo
.
numOfEps
;
assert
(
numOfEps
>=
0
&&
numOfEps
<=
TSDB_MAX_REPLICA
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
++
i
)
{
tfree
(
p
->
vgroupInfo
.
epAddr
[
i
].
fqdn
);
}
int32_t
numOfEps1
=
p
->
corVgroupInfo
.
numOfEps
;
assert
(
numOfEps1
>=
0
&&
numOfEps1
<=
TSDB_MAX_REPLICA
);
...
...
@@ -1944,27 +1937,21 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
sqlstr
=
NULL
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetailSafely
(
pCmd
,
0
);
assert
(
pSql
->
cmd
.
clauseIndex
==
0
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
registerSqlObj
(
pNew
);
return
pNew
;
}
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
p
QueryInfo
,
SQueryInfo
*
p
NewQueryInfo
,
int64_t
uid
)
{
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
pNewQueryInfo
,
int64_t
uid
)
{
int32_t
numOfOutput
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
if
(
numOfOutput
==
0
)
{
return
;
...
...
@@ -2019,13 +2006,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew
->
pTscObj
=
pSql
->
pTscObj
;
pNew
->
signature
=
pNew
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed, tableIndex:%d, vgroupIndex:%d"
,
pSql
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
pNew
->
sqlstr
=
NULL
;
SSqlCmd
*
pnCmd
=
&
pNew
->
cmd
;
memcpy
(
pnCmd
,
pCmd
,
sizeof
(
SSqlCmd
));
...
...
@@ -2113,19 +2094,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
goto
_error
;
}
doSetSqlExprAndResultFieldInfo
(
p
QueryInfo
,
p
NewQueryInfo
,
uid
);
doSetSqlExprAndResultFieldInfo
(
pNewQueryInfo
,
uid
);
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
char
*
name
=
pTableMetaInfo
->
name
;
STableMetaInfo
*
pFinalInfo
=
NULL
;
if
(
pPrevSql
==
NULL
)
{
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
// get by name may failed due to the cache cleanup
if
(
pPrevSql
==
NULL
)
{
// get by name may failed due to the cache cleanup
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
assert
(
pTableMeta
!=
NULL
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
...
...
src/query/inc/qUtil.h
浏览文件 @
62127398
...
...
@@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
);
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
);
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
);
...
...
src/query/src/qExecutor.c
浏览文件 @
62127398
...
...
@@ -5182,10 +5182,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
scanMultiTableDataBlocks
(
pQInfo
);
pQInfo
->
groupIndex
+=
1
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
taosArrayDestroy
(
s
)
;
// no results generated for current group, continue to try the next group
taosArrayDestroy
(
s
)
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pWindowResInfo
->
size
<=
0
)
{
continue
;
}
...
...
@@ -5212,8 +5212,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo
->
groupIndex
=
currentGroupIndex
;
// restore the group index
assert
(
pQuery
->
rec
.
rows
==
pWindowResInfo
->
size
);
clearClosedResultRows
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
resetResultRowInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
break
;
}
}
else
if
(
pRuntimeEnv
->
queryWindowIdentical
&&
pRuntimeEnv
->
pTsBuf
==
NULL
&&
!
isTSCompQuery
(
pQuery
))
{
...
...
src/query/src/qUtil.c
浏览文件 @
62127398
...
...
@@ -20,18 +20,6 @@
#include "qExecutor.h"
#include "qUtil.h"
static
int32_t
getResultRowKeyInfo
(
SResultRow
*
pResult
,
int16_t
type
,
char
**
key
,
int16_t
*
bytes
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
*
key
=
varDataVal
(
pResult
->
key
);
*
bytes
=
varDataLen
(
pResult
->
key
);
}
else
{
*
key
=
(
char
*
)
&
pResult
->
win
.
skey
;
*
bytes
=
tDataTypeDesc
[
type
].
nSize
;
}
return
0
;
}
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
)
{
int32_t
size
=
0
;
...
...
@@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
||
num
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
assert
(
num
>=
0
&&
num
<=
numOfClosed
);
int16_t
type
=
pResultRowInfo
->
type
;
int64_t
uid
=
0
;
char
*
key
=
NULL
;
int16_t
bytes
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
i
];
if
(
pResult
->
closed
)
{
// remove the window slot from hash table
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
}
else
{
break
;
}
}
int32_t
remain
=
pResultRowInfo
->
size
-
num
;
// clear all the closed windows from the window list
for
(
int32_t
k
=
0
;
k
<
remain
;
++
k
)
{
copyResultRow
(
pRuntimeEnv
,
pResultRowInfo
->
pResult
[
k
],
pResultRowInfo
->
pResult
[
num
+
k
],
type
);
}
// move the unclosed window in the front of the window list
for
(
int32_t
k
=
remain
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
k
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
,
pResultRowInfo
->
type
);
}
pResultRowInfo
->
size
=
remain
;
for
(
int32_t
k
=
0
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
k
];
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
assert
(
p
!=
NULL
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pResultRowInfo
->
size
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
pResultRowInfo
->
curIndex
=
-
1
;
}
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
numOfClosed
);
}
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
...
...
@@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
}
}
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
if
(
pResultRowInfo
->
size
<=
1
)
{
return
;
}
// get the result order
int32_t
resultOrder
=
(
pResultRowInfo
->
pResult
[
0
]
->
win
.
skey
<
pResultRowInfo
->
pResult
[
1
]
->
win
.
skey
)
?
1
:-
1
;
if
(
order
!=
resultOrder
)
{
return
;
}
int32_t
i
=
0
;
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
TSKEY
ekey
=
pResultRowInfo
->
pResult
[
i
]
->
win
.
ekey
;
while
(
i
<
pResultRowInfo
->
size
&&
(
ekey
<
lastKey
))
{
++
i
;
}
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
while
(
i
<
pResultRowInfo
->
size
&&
(
pResultRowInfo
->
pResult
[
i
]
->
win
.
skey
>
lastKey
))
{
++
i
;
}
}
if
(
i
<
pResultRowInfo
->
size
)
{
pResultRowInfo
->
size
=
(
i
+
1
);
}
}
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
}
...
...
@@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
}
/**
* The source window result pos attribution of the source window result does not assign to the destination,
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
)
{
dst
->
numOfRows
=
src
->
numOfRows
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
dst
->
key
=
realloc
(
dst
->
key
,
varDataTLen
(
src
->
key
));
varDataCopy
(
dst
->
key
,
src
->
key
);
}
else
{
dst
->
win
=
src
->
win
;
}
dst
->
closed
=
src
->
closed
;
int32_t
nOutputCols
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
for
(
int32_t
i
=
0
;
i
<
nOutputCols
;
++
i
)
{
SResultRowCellInfo
*
pDst
=
getResultCell
(
pRuntimeEnv
,
dst
,
i
);
SResultRowCellInfo
*
pSrc
=
getResultCell
(
pRuntimeEnv
,
src
,
i
);
// char *buf = pDst->interResultBuf;
memcpy
(
pDst
,
pSrc
,
sizeof
(
SResultRowCellInfo
)
+
pRuntimeEnv
->
pCtx
[
i
].
interBufBytes
);
// pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SResultRow
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pExpr1
[
i
].
bytes
;
memcpy
(
dstBuf
,
srcBuf
,
s
);
}
}
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
)
{
assert
(
index
>=
0
&&
index
<
pRuntimeEnv
->
pQuery
->
numOfOutput
);
return
(
SResultRowCellInfo
*
)((
char
*
)
pRow
->
pCellInfo
+
pRuntimeEnv
->
rowCellInfoOffset
[
index
]);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录