Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ac319773
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ac319773
编写于
12月 27, 2020
作者:
H
haojun Liao
提交者:
GitHub
12月 27, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4712 from taosdata/feature/query
Feature/query
上级
6d04a266
04e5bb2c
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
124 addition
and
352 deletion
+124
-352
src/client/inc/tscLocalMerge.h
src/client/inc/tscLocalMerge.h
+0
-7
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+11
-9
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+0
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+6
-27
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+8
-2
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+11
-19
src/client/src/tscServer.c
src/client/src/tscServer.c
+28
-63
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+9
-9
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+23
-34
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+0
-4
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+15
-11
src/query/src/qUtil.c
src/query/src/qUtil.c
+0
-154
src/util/inc/hash.h
src/util/inc/hash.h
+7
-6
src/util/src/hash.c
src/util/src/hash.c
+3
-3
src/util/src/tcache.c
src/util/src/tcache.c
+1
-1
src/vnode/src/vnodeMgmt.c
src/vnode/src/vnodeMgmt.c
+1
-1
未找到文件。
src/client/inc/tscLocalMerge.h
浏览文件 @
ac319773
...
...
@@ -38,12 +38,6 @@ typedef struct SLocalDataSource {
tFilePage
filePage
;
}
SLocalDataSource
;
enum
{
TSC_LOCALREDUCE_READY
=
0x0
,
TSC_LOCALREDUCE_IN_PROGRESS
=
0x1
,
TSC_LOCALREDUCE_TOBE_FREED
=
0x2
,
};
typedef
struct
SLocalReducer
{
SLocalDataSource
**
pLocalDataSrc
;
int32_t
numOfBuffer
;
...
...
@@ -56,7 +50,6 @@ typedef struct SLocalReducer {
tFilePage
*
pTempBuffer
;
struct
SQLFunctionCtx
*
pCtx
;
int32_t
rowSize
;
// size of each intermediate result.
int32_t
status
;
// denote it is in reduce process, in reduce process, it
bool
hasPrevRow
;
// cannot be released
bool
hasUnprocessedRow
;
tOrderDescriptor
*
pDesc
;
...
...
src/client/inc/tsclient.h
浏览文件 @
ac319773
...
...
@@ -69,9 +69,10 @@ typedef struct STableMeta {
int16_t
sversion
;
int16_t
tversion
;
char
sTableId
[
TSDB_TABLE_FNAME_LEN
];
SVgroupInfo
vgroupInfo
;
int32_t
vgId
;
SCorVgroupInfo
corVgroupInfo
;
STableId
id
;
// union {int64_t stableUid; SSchema* schema;};
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
}
STableMeta
;
...
...
@@ -307,6 +308,7 @@ typedef struct STscObj {
SRpcCorEpSet
*
tscCorMgmtEpSet
;
void
*
pDnodeConn
;
pthread_mutex_t
mutex
;
int32_t
numOfObj
;
// number of sqlObj from this tscObj
}
STscObj
;
typedef
struct
SSubqueryState
{
...
...
@@ -477,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
}
}
extern
SCacheObj
*
tscMetaCache
;
extern
int
tscObjRef
;
extern
void
*
tscTmr
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[]
;
extern
int
tscNumOfThreads
;
extern
int
tscRefId
;
extern
SCacheObj
*
tscMetaCache
;
extern
int
tscObjRef
;
extern
void
*
tscTmr
;
extern
void
*
tscQhandle
;
extern
int
tscKeepConn
[]
;
extern
int
tscRefId
;
extern
int
tscNumOfObj
;
// number of existed sqlObj in current process.
extern
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
ac319773
...
...
@@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char
*
tmp
=
(
char
*
)
pInfo
+
sizeof
(
SAPercentileInfo
);
pInfo
->
pHisto
=
tHistogramCreateFrom
(
tmp
,
MAX_HISTOGRAM_BIN
);
printf
(
"%p, %p
\n
"
,
pInfo
->
pHisto
,
pInfo
->
pHisto
->
elems
);
return
true
;
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
ac319773
...
...
@@ -93,7 +93,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
// for top/bottom function, the output of timestamp is the first column
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
->
ptsOutputBuf
=
pReducer
->
pCtx
[
0
].
aOutputBuf
;
pCtx
->
param
[
2
].
i64Key
=
pQueryInfo
->
order
.
order
;
pCtx
->
param
[
2
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
...
...
@@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
// there is no more result, so we release all allocated resource
SLocalReducer
*
pLocalReducer
=
(
SLocalReducer
*
)
atomic_exchange_ptr
(
&
pRes
->
pLocalReducer
,
NULL
);
if
(
pLocalReducer
!=
NULL
)
{
int32_t
status
=
0
;
while
((
status
=
atomic_val_compare_exchange_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_TOBE_FREED
))
==
TSC_LOCALREDUCE_IN_PROGRESS
)
{
taosMsleep
(
100
);
tscDebug
(
"%p waiting for delete procedure, status: %d"
,
pSql
,
status
);
}
pLocalReducer
->
pFillInfo
=
taosDestroyFillInfo
(
pLocalReducer
->
pFillInfo
);
if
(
pLocalReducer
->
pCtx
!=
NULL
)
{
...
...
@@ -1303,6 +1296,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// re
for
(
int32_t
i
=
0
;
i
<
t
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
pLocalReducer
->
pCtx
[
i
].
aOutputBuf
=
pLocalReducer
->
pResultBuf
->
data
+
pExpr
->
offset
*
pLocalReducer
->
resColModel
->
capacity
;
if
(
pExpr
->
functionId
==
TSDB_FUNC_TOP
||
pExpr
->
functionId
==
TSDB_FUNC_BOTTOM
||
pExpr
->
functionId
==
TSDB_FUNC_DIFF
)
{
pLocalReducer
->
pCtx
[
i
].
ptsOutputBuf
=
pLocalReducer
->
pCtx
[
0
].
aOutputBuf
;
}
}
memset
(
pLocalReducer
->
pResultBuf
,
0
,
pLocalReducer
->
nResultBufSize
+
sizeof
(
tFilePage
));
...
...
@@ -1437,24 +1434,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// set the data merge in progress
int32_t
prevStatus
=
atomic_val_compare_exchange_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_IN_PROGRESS
);
if
(
prevStatus
!=
TSC_LOCALREDUCE_READY
)
{
assert
(
prevStatus
==
TSC_LOCALREDUCE_TOBE_FREED
);
// it is in tscDestroyLocalReducer function already
return
TSDB_CODE_SUCCESS
;
}
tFilePage
*
tmpBuffer
=
pLocalReducer
->
pTempBuffer
;
tFilePage
*
tmpBuffer
=
pLocalReducer
->
pTempBuffer
;
if
(
doHandleLastRemainData
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
if
(
doBuildFilledResultForGroup
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1510,7 +1496,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
pLocalReducer
->
discardData
->
num
=
0
;
if
(
saveGroupResultInfo
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1556,7 +1541,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
// here we do not check the return value
adjustLoserTreeFromNewData
(
pLocalReducer
,
pOneDataSrc
,
pTree
);
assert
(
pLocalReducer
->
status
==
TSC_LOCALREDUCE_IN_PROGRESS
);
if
(
pRes
->
numOfRows
==
0
)
{
handleUnprocessedRow
(
pCmd
,
pLocalReducer
,
tmpBuffer
);
...
...
@@ -1567,7 +1551,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* If previous group is not skipped, keep it in pRes->numOfGroups
*/
if
(
notSkipped
&&
saveGroupResultInfo
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1587,7 +1570,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if
(
pRes
->
numOfRows
==
0
)
{
continue
;
}
else
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
}
else
{
// result buffer is not full
...
...
@@ -1612,9 +1594,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
genFinalResults
(
pSql
,
pLocalReducer
,
true
);
}
assert
(
pLocalReducer
->
status
==
TSC_LOCALREDUCE_IN_PROGRESS
&&
pRes
->
row
==
0
);
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
ac319773
...
...
@@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI
return
code
;
}
dataBuf
->
vgId
=
pTableMeta
->
vg
roupInfo
.
vg
Id
;
dataBuf
->
vgId
=
pTableMeta
->
vgId
;
dataBuf
->
numOfTables
=
1
;
*
totalNum
+=
numOfRows
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
ac319773
...
...
@@ -666,6 +666,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
const
char
*
msg1
=
"invalid query expression"
;
const
char
*
msg2
=
"interval cannot be less than 10 ms"
;
const
char
*
msg3
=
"sliding cannot be used without interval"
;
const
char
*
msg4
=
"top/bottom query does not support order by value in interval query"
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -712,6 +713,11 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
return
TSDB_CODE_TSC_INVALID_SQL
;
}
int32_t
colId
=
pQueryInfo
->
order
.
orderColId
;
if
(
pQueryInfo
->
interval
.
interval
>
0
&&
colId
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg4
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4646,7 +4652,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
if
(
!
(
orderByTags
||
orderByTS
)
&&
!
isTopBottomQuery
(
pQueryInfo
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
else
{
}
else
{
// order by top/bottom result value column is not supported in case of interval query.
assert
(
!
(
orderByTags
&&
orderByTS
));
}
...
...
@@ -4936,7 +4942,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
id
.
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
ac319773
...
...
@@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
return
NULL
;
}
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Info
*
vgroupInfo
)
{
static
void
tscInitCorVgroupInfo
(
SCorVgroupInfo
*
corVgroupInfo
,
SVgroup
Msg
*
pVgroupMsg
)
{
corVgroupInfo
->
version
=
0
;
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
vgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
corVgroupInfo
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strdup
(
vgroupInfo
->
epAddr
[
i
].
fqdn
);
corVgroupInfo
->
epAddr
[
i
].
port
=
vgroupInfo
->
epAddr
[
i
].
port
;
corVgroupInfo
->
inUse
=
0
;
corVgroupInfo
->
numOfEps
=
pVgroupMsg
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupMsg
->
numOfEps
;
i
++
)
{
corVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pVgroupMsg
->
epAddr
[
i
].
fqdn
,
tListLen
(
pVgroupMsg
->
epAddr
[
0
].
fqdn
));
corVgroupInfo
->
epAddr
[
i
].
port
=
pVgroupMsg
->
epAddr
[
i
].
port
;
}
}
...
...
@@ -145,8 +146,10 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t
schemaSize
=
(
pTableMetaMsg
->
numOfColumns
+
pTableMetaMsg
->
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
schemaSize
);
pTableMeta
->
tableType
=
pTableMetaMsg
->
tableType
;
pTableMeta
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
pTableMeta
->
tableInfo
=
(
STableComInfo
)
{
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
.
precision
=
pTableMetaMsg
->
precision
,
...
...
@@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta
->
id
.
tid
=
pTableMetaMsg
->
tid
;
pTableMeta
->
id
.
uid
=
pTableMetaMsg
->
uid
;
SVgroupInfo
*
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
pVgroupInfo
->
numOfEps
=
pTableMetaMsg
->
vgroup
.
numOfEps
;
pVgroupInfo
->
vgId
=
pTableMetaMsg
->
vgroup
.
vgId
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
SEpAddrMsg
*
pEpMsg
=
&
pTableMetaMsg
->
vgroup
.
epAddr
[
i
];
pVgroupInfo
->
epAddr
[
i
].
fqdn
=
strndup
(
pEpMsg
->
fqdn
,
tListLen
(
pEpMsg
->
fqdn
));
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpMsg
->
port
;
}
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
pVgroupInfo
);
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
&
pTableMetaMsg
->
vgroup
);
pTableMeta
->
sversion
=
pTableMetaMsg
->
sversion
;
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
...
...
src/client/src/tscServer.c
浏览文件 @
ac319773
...
...
@@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return
0
;
}
return
initial
*
(
2
<<
(
count
-
2
));
return
initial
*
(
(
2u
)
<<
(
count
-
2
));
}
static
void
tscSetDnodeEpSet
(
SSqlObj
*
pSql
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pSql
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
SRpcEpSet
*
pEpSet
=
&
pSql
->
epSet
;
static
void
tscSetDnodeEpSet
(
SRpcEpSet
*
pEpSet
,
SVgroupInfo
*
pVgroupInfo
)
{
assert
(
pEpSet
!=
NULL
&&
pVgroupInfo
!=
NULL
&&
pVgroupInfo
->
numOfEps
>
0
);
// Issue the query to one of the vnode among a vgroup randomly.
// change the inUse property would not affect the isUse attribute of STableMeta
pEpSet
->
inUse
=
rand
()
%
pVgroupInfo
->
numOfEps
;
// apply the FQDN string length check here
bool
hasFqdn
=
false
;
bool
existed
=
false
;
pEpSet
->
numOfEps
=
pVgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
pVgroupInfo
->
numOfEps
;
++
i
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
pEpSet
->
port
[
i
]
=
pVgroupInfo
->
epAddr
[
i
].
port
;
if
(
!
hasFqdn
)
{
hasFqdn
=
(
strlen
(
pEpSet
->
fqdn
[
i
])
>
0
);
int32_t
len
=
(
int32_t
)
strnlen
(
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
len
>
0
)
{
tstrncpy
(
pEpSet
->
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
existed
=
true
;
}
}
assert
(
hasFqdn
);
assert
(
existed
);
}
static
void
tscDumpMgmtEpSet
(
SSqlObj
*
pSql
)
{
...
...
@@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) {
pCorEpSet
->
epSet
=
*
pEpSet
;
taosCorEndWrite
(
&
pCorEpSet
->
version
);
}
static
void
tscDumpEpSetFromVgroupInfo
(
SCorVgroupInfo
*
pVgroupInfo
,
SRpcEpSet
*
pEpSet
)
{
static
void
tscDumpEpSetFromVgroupInfo
(
SRpcEpSet
*
pEpSet
,
SCorVgroupInfo
*
pVgroupInfo
)
{
if
(
pVgroupInfo
==
NULL
)
{
return
;}
taosCorBeginRead
(
&
pVgroupInfo
->
version
);
int8_t
inUse
=
pVgroupInfo
->
inUse
;
...
...
@@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vg
roupInfo
.
vg
Id
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vg
roupInfo
.
vg
Id
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vgId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// NOTE: shell message size should not include SMsgDesc
int32_t
size
=
pSql
->
cmd
.
payloadLen
-
sizeof
(
SMsgDesc
);
int32_t
vgId
=
pTableMeta
->
vgroupInfo
.
vgId
;
SMsgDesc
*
pMsgDesc
=
(
SMsgDesc
*
)
pMsg
;
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// always one vnode
...
...
@@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
sizeof
(
SMsgDesc
);
SSubmitMsg
*
pShellMsg
=
(
SSubmitMsg
*
)
pMsg
;
pShellMsg
->
header
.
vgId
=
htonl
(
vgId
);
pShellMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
size
);
// the length not includes the size of SMsgDesc
pShellMsg
->
length
=
pShellMsg
->
header
.
contLen
;
...
...
@@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
tscDumpEpSetFromVgroupInfo
(
&
p
TableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
tscDebug
(
"%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d"
,
pSql
,
pTableMeta
->
vgId
,
pSql
->
cmd
.
numOfTablesInSubmit
,
pSql
->
epSet
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
)
||
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
SVgroupInfo
*
pVgroupInfo
=
NULL
;
int32_t
vgId
=
-
1
;
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
assert
(
index
>=
0
);
SVgroupInfo
*
pVgroupInfo
=
NULL
;
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
>
0
)
{
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
}
vgId
=
pVgroupInfo
->
vgId
;
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
pVgroupInfo
);
tscDebug
(
"%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
}
else
{
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
vgId
=
pTableMeta
->
vgId
;
tscDumpEpSetFromVgroupInfo
(
&
pSql
->
epSet
,
&
pTableMeta
->
corVgroupInfo
);
}
assert
(
pVgroupInfo
!=
NULL
)
;
pSql
->
epSet
.
inUse
=
rand
()
%
pSql
->
epSet
.
numOfEps
;
tscSetDnodeEpSet
(
pSql
,
pVgroupInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
pQueryMsg
->
head
.
vgId
=
htonl
(
vgId
);
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
...
...
@@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
index
);
// set the vgroup info
tscSetDnodeEpSet
(
pSql
,
&
pTableIdList
->
vgInfo
);
tscSetDnodeEpSet
(
&
pSql
->
epSet
,
&
pTableIdList
->
vgInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pTableIdList
->
vgInfo
.
vgId
);
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTableIdList
->
itemList
);
...
...
@@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tscDumpEpSetFromVgroupInfo
(
&
p
TableMetaInfo
->
pTableMeta
->
corVgroupInfo
,
&
pSql
->
epSet
);
tscDumpEpSetFromVgroupInfo
(
&
p
Sql
->
epSet
,
&
pTableMetaInfo
->
pTableMeta
->
corVgroupInfo
);
return
TSDB_CODE_SUCCESS
;
}
//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
// pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
//
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// int32_t vgIndex = pTableMetaInfo->vgroupIndex;
// if (pTableMetaInfo->pVgroupTables == NULL) {
// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
//
// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
// } else {
// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
// assert(vgIndex >= 0 && vgIndex < numOfVgroups);
//
// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
//
// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
// }
// } else {
// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
// }
//
// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
//
// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
// return TSDB_CODE_SUCCESS;
//}
int
tscAlterDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SAlterDbMsg
);
...
...
src/client/src/tscSystem.c
浏览文件 @
ac319773
...
...
@@ -31,17 +31,16 @@
#include "tlocale.h"
// global, not configurable
SCacheObj
*
tscMetaCache
;
SCacheObj
*
tscMetaCache
;
// table meta cache
SHashObj
*
tscHashMap
;
// hash map to keep the global vgroup info
int
tscObjRef
=
-
1
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
int
tscRefId
=
-
1
;
int
tscNumOfThreads
;
int
tscNumOfObj
=
0
;
// number of sqlObj in current process.
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void
tscCheckDiskUsage
(
void
*
UNUSED_PARAM
(
para
),
void
*
UNUSED_PARAM
(
param
))
{
taosGetDisk
();
...
...
@@ -114,7 +113,7 @@ void taos_init_imp(void) {
int
queueSize
=
tsMaxConnections
*
2
;
double
factor
=
(
tscEmbedded
==
0
)
?
2
.
0
:
4
.
0
;
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
int32_t
tscNumOfThreads
=
(
int
)(
tsNumOfCores
*
tsNumOfThreadsPerCore
/
factor
);
if
(
tscNumOfThreads
<
2
)
{
tscNumOfThreads
=
2
;
}
...
...
@@ -133,7 +132,8 @@ void taos_init_imp(void) {
int64_t
refreshTime
=
10
;
// 10 seconds by default
if
(
tscMetaCache
==
NULL
)
{
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
tscFreeTableMetaHelper
,
"tableMeta"
);
tscObjRef
=
taosOpenRef
(
40960
,
tscFreeRegisteredSqlObj
);
tscObjRef
=
taosOpenRef
(
40960
,
tscFreeRegisteredSqlObj
);
tscHashMap
=
taosHashInit
(
1024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
}
tscRefId
=
taosOpenRef
(
200
,
tscCloseTscObj
);
...
...
src/client/src/tscUtil.c
浏览文件 @
ac319773
...
...
@@ -458,21 +458,19 @@ void tscFreeRegisteredSqlObj(void *pSql) {
SSqlObj
*
p
=
*
(
SSqlObj
**
)
pSql
;
STscObj
*
pTscObj
=
p
->
pTscObj
;
assert
(
p
->
self
!=
0
);
assert
(
RID_VALID
(
p
->
self
));
tscFreeSqlObj
(
p
);
taosReleaseRef
(
tscRefId
,
pTscObj
->
rid
);
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_sub_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"%p free SqlObj, total in tscObj:%d, total:%d"
,
pSql
,
num
,
total
);
}
void
tscFreeTableMetaHelper
(
void
*
pTableMeta
)
{
STableMeta
*
p
=
(
STableMeta
*
)
pTableMeta
;
int32_t
numOfEps
=
p
->
vgroupInfo
.
numOfEps
;
assert
(
numOfEps
>=
0
&&
numOfEps
<=
TSDB_MAX_REPLICA
);
for
(
int32_t
i
=
0
;
i
<
numOfEps
;
++
i
)
{
tfree
(
p
->
vgroupInfo
.
epAddr
[
i
].
fqdn
);
}
int32_t
numOfEps1
=
p
->
corVgroupInfo
.
numOfEps
;
assert
(
numOfEps1
>=
0
&&
numOfEps1
<=
TSDB_MAX_REPLICA
);
...
...
@@ -1912,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
void
registerSqlObj
(
SSqlObj
*
pSql
)
{
taosAcquireRef
(
tscRefId
,
pSql
->
pTscObj
->
rid
);
pSql
->
self
=
taosAddRef
(
tscObjRef
,
pSql
);
int32_t
num
=
atomic_add_fetch_32
(
&
pSql
->
pTscObj
->
numOfObj
,
1
);
int32_t
total
=
atomic_add_fetch_32
(
&
tscNumOfObj
,
1
);
tscDebug
(
"%p new SqlObj from %p, total in tscObj:%d, total:%d"
,
pSql
,
pSql
->
pTscObj
,
num
,
total
);
}
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
int32_t
cmd
)
{
...
...
@@ -1941,30 +1943,24 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return
NULL
;
}
pNew
->
fp
=
fp
;
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
param
=
param
;
pNew
->
sqlstr
=
NULL
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetailSafely
(
pCmd
,
0
);
assert
(
pSql
->
cmd
.
clauseIndex
==
0
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
registerSqlObj
(
pNew
);
return
pNew
;
}
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
p
QueryInfo
,
SQueryInfo
*
p
NewQueryInfo
,
int64_t
uid
)
{
static
void
doSetSqlExprAndResultFieldInfo
(
SQueryInfo
*
pNewQueryInfo
,
int64_t
uid
)
{
int32_t
numOfOutput
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pNewQueryInfo
);
if
(
numOfOutput
==
0
)
{
return
;
...
...
@@ -2017,15 +2013,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
tableIndex
);
pNew
->
pTscObj
=
pSql
->
pTscObj
;
pNew
->
pTscObj
=
pSql
->
pTscObj
;
pNew
->
signature
=
pNew
;
pNew
->
sqlstr
=
strdup
(
pSql
->
sqlstr
);
if
(
pNew
->
sqlstr
==
NULL
)
{
tscError
(
"%p new subquery failed, tableIndex:%d, vgroupIndex:%d"
,
pSql
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
pNew
->
sqlstr
=
NULL
;
SSqlCmd
*
pnCmd
=
&
pNew
->
cmd
;
memcpy
(
pnCmd
,
pCmd
,
sizeof
(
SSqlCmd
));
...
...
@@ -2113,23 +2103,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
goto
_error
;
}
doSetSqlExprAndResultFieldInfo
(
p
QueryInfo
,
p
NewQueryInfo
,
uid
);
doSetSqlExprAndResultFieldInfo
(
pNewQueryInfo
,
uid
);
pNew
->
fp
=
fp
;
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
param
=
param
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
char
*
name
=
pTableMetaInfo
->
name
;
STableMetaInfo
*
pFinalInfo
=
NULL
;
if
(
pPrevSql
==
NULL
)
{
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
// get by name may failed due to the cache cleanup
if
(
pPrevSql
==
NULL
)
{
// get by name may failed due to the cache cleanup
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
assert
(
pTableMeta
!=
NULL
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
else
{
// transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo
*
pPrevInfo
=
tscGetTableMetaInfoFromCmd
(
&
pPrevSql
->
cmd
,
pPrevSql
->
cmd
.
clauseIndex
,
0
);
...
...
src/query/inc/qUtil.h
浏览文件 @
ac319773
...
...
@@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
);
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
);
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
);
...
...
src/query/src/qExecutor.c
浏览文件 @
ac319773
...
...
@@ -753,11 +753,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
static
void
updateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
STableQueryInfo
*
pTableQueryInfo
,
bool
ascQuery
)
{
if
((
pTableQueryInfo
->
lastKey
>
=
pTableQueryInfo
->
win
.
ekey
&&
ascQuery
)
||
(
pTableQueryInfo
->
lastKey
<=
pTableQueryInfo
->
win
.
ekey
&&
(
!
ascQuery
)))
{
if
((
pTableQueryInfo
->
lastKey
>
pTableQueryInfo
->
win
.
ekey
&&
ascQuery
)
||
(
pTableQueryInfo
->
lastKey
<
pTableQueryInfo
->
win
.
ekey
&&
(
!
ascQuery
)))
{
closeAllResultRows
(
pResultRowInfo
);
pResultRowInfo
->
curIndex
=
pResultRowInfo
->
size
-
1
;
}
else
{
doUpdateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
->
lastKey
,
ascQuery
);
int32_t
step
=
ascQuery
?
1
:-
1
;
doUpdateResultRowIndex
(
pResultRowInfo
,
pTableQueryInfo
->
lastKey
-
step
,
ascQuery
);
}
}
...
...
@@ -1198,8 +1199,12 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// prev time window not interpolation yet.
int32_t
curIndex
=
curTimeWindowIndex
(
pWindowResInfo
);
if
(
prevIndex
!=
-
1
&&
prevIndex
<
curIndex
&&
pRuntimeEnv
->
timeWindowInterpo
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
// previous time window may be all closed already.
SResultRow
*
pRes
=
pWindowResInfo
->
pResult
[
j
];
if
(
pRes
->
closed
)
{
assert
(
resultRowInterpolated
(
pRes
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pRes
,
RESULT_ROW_END_INTERP
));
continue
;
}
STimeWindow
w
=
pRes
->
win
;
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
&
w
,
masterScan
,
&
pResult
,
groupId
);
...
...
@@ -1600,6 +1605,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if
(
prevWindowIndex
!=
-
1
&&
prevWindowIndex
<
curIndex
)
{
for
(
int32_t
k
=
prevWindowIndex
;
k
<
curIndex
;
++
k
)
{
SResultRow
*
pRes
=
pWindowResInfo
->
pResult
[
k
];
if
(
pRes
->
closed
)
{
assert
(
resultRowInterpolated
(
pResult
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
));
continue
;
}
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
&
pRes
->
win
,
masterScan
,
&
pResult
,
groupId
);
assert
(
ret
==
TSDB_CODE_SUCCESS
&&
!
resultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
));
...
...
@@ -1713,10 +1722,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pResultRowInfo
,
searchFn
,
pDataBlock
);
}
// update the lastkey of current table
TSKEY
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pDataBlockInfo
->
window
.
ekey
:
pDataBlockInfo
->
window
.
skey
;
pTableQueryInfo
->
lastKey
=
lastKey
+
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
// interval query with limit applied
int32_t
numOfRes
=
0
;
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
...
...
@@ -5181,10 +5186,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
scanMultiTableDataBlocks
(
pQInfo
);
pQInfo
->
groupIndex
+=
1
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
taosArrayDestroy
(
s
)
;
// no results generated for current group, continue to try the next group
taosArrayDestroy
(
s
)
;
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pWindowResInfo
->
size
<=
0
)
{
continue
;
}
...
...
@@ -5211,8 +5216,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo
->
groupIndex
=
currentGroupIndex
;
// restore the group index
assert
(
pQuery
->
rec
.
rows
==
pWindowResInfo
->
size
);
clearClosedResultRows
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
resetResultRowInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
break
;
}
}
else
if
(
pRuntimeEnv
->
queryWindowIdentical
&&
pRuntimeEnv
->
pTsBuf
==
NULL
&&
!
isTSCompQuery
(
pQuery
))
{
...
...
src/query/src/qUtil.c
浏览文件 @
ac319773
...
...
@@ -20,18 +20,6 @@
#include "qExecutor.h"
#include "qUtil.h"
static
int32_t
getResultRowKeyInfo
(
SResultRow
*
pResult
,
int16_t
type
,
char
**
key
,
int16_t
*
bytes
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
*
key
=
varDataVal
(
pResult
->
key
);
*
bytes
=
varDataLen
(
pResult
->
key
);
}
else
{
*
key
=
(
char
*
)
&
pResult
->
win
.
skey
;
*
bytes
=
tDataTypeDesc
[
type
].
nSize
;
}
return
0
;
}
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
)
{
int32_t
size
=
0
;
...
...
@@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
||
num
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
assert
(
num
>=
0
&&
num
<=
numOfClosed
);
int16_t
type
=
pResultRowInfo
->
type
;
int64_t
uid
=
0
;
char
*
key
=
NULL
;
int16_t
bytes
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
i
];
if
(
pResult
->
closed
)
{
// remove the window slot from hash table
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
}
else
{
break
;
}
}
int32_t
remain
=
pResultRowInfo
->
size
-
num
;
// clear all the closed windows from the window list
for
(
int32_t
k
=
0
;
k
<
remain
;
++
k
)
{
copyResultRow
(
pRuntimeEnv
,
pResultRowInfo
->
pResult
[
k
],
pResultRowInfo
->
pResult
[
num
+
k
],
type
);
}
// move the unclosed window in the front of the window list
for
(
int32_t
k
=
remain
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
k
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
,
pResultRowInfo
->
type
);
}
pResultRowInfo
->
size
=
remain
;
for
(
int32_t
k
=
0
;
k
<
pResultRowInfo
->
size
;
++
k
)
{
SResultRow
*
pResult
=
pResultRowInfo
->
pResult
[
k
];
getResultRowKeyInfo
(
pResult
,
type
,
&
key
,
&
bytes
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
assert
(
p
!=
NULL
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pResultRowInfo
->
size
);
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
key
,
bytes
,
uid
);
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
pResultRowInfo
->
curIndex
=
-
1
;
}
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosedResultRows
(
pResultRowInfo
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
numOfClosed
);
}
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
...
...
@@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
}
}
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
if
(
pResultRowInfo
->
size
<=
1
)
{
return
;
}
// get the result order
int32_t
resultOrder
=
(
pResultRowInfo
->
pResult
[
0
]
->
win
.
skey
<
pResultRowInfo
->
pResult
[
1
]
->
win
.
skey
)
?
1
:-
1
;
if
(
order
!=
resultOrder
)
{
return
;
}
int32_t
i
=
0
;
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
TSKEY
ekey
=
pResultRowInfo
->
pResult
[
i
]
->
win
.
ekey
;
while
(
i
<
pResultRowInfo
->
size
&&
(
ekey
<
lastKey
))
{
++
i
;
}
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
while
(
i
<
pResultRowInfo
->
size
&&
(
pResultRowInfo
->
pResult
[
i
]
->
win
.
skey
>
lastKey
))
{
++
i
;
}
}
if
(
i
<
pResultRowInfo
->
size
)
{
pResultRowInfo
->
size
=
(
i
+
1
);
}
}
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
}
...
...
@@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
}
/**
* The source window result pos attribution of the source window result does not assign to the destination,
* since the attribute of "Pos" is bound to each window result when the window result is created in the
* disk-based result buffer.
*/
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
)
{
dst
->
numOfRows
=
src
->
numOfRows
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
dst
->
key
=
realloc
(
dst
->
key
,
varDataTLen
(
src
->
key
));
varDataCopy
(
dst
->
key
,
src
->
key
);
}
else
{
dst
->
win
=
src
->
win
;
}
dst
->
closed
=
src
->
closed
;
int32_t
nOutputCols
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
for
(
int32_t
i
=
0
;
i
<
nOutputCols
;
++
i
)
{
SResultRowCellInfo
*
pDst
=
getResultCell
(
pRuntimeEnv
,
dst
,
i
);
SResultRowCellInfo
*
pSrc
=
getResultCell
(
pRuntimeEnv
,
src
,
i
);
// char *buf = pDst->interResultBuf;
memcpy
(
pDst
,
pSrc
,
sizeof
(
SResultRowCellInfo
)
+
pRuntimeEnv
->
pCtx
[
i
].
interBufBytes
);
// pDst->interResultBuf = buf; // restore the allocated buffer
// copy the result info struct
// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SResultRow
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pExpr1
[
i
].
bytes
;
memcpy
(
dstBuf
,
srcBuf
,
s
);
}
}
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
)
{
assert
(
index
>=
0
&&
index
<
pRuntimeEnv
->
pQuery
->
numOfOutput
);
return
(
SResultRowCellInfo
*
)((
char
*
)
pRow
->
pCellInfo
+
pRuntimeEnv
->
rowCellInfoOffset
[
index
]);
...
...
src/util/inc/hash.h
浏览文件 @
ac319773
...
...
@@ -32,17 +32,18 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef
struct
SHashNode
{
struct
SHashNode
*
next
;
uint32_t
hashVal
;
// the hash value of key
uint32_t
keyLen
;
// length of the key
size_t
dataLen
;
// length of data
int8_t
count
;
// reference count
int8_t
removed
;
// flag to indicate removed
uint32_t
hashVal
;
// the hash value of key
uint32_t
dataLen
;
// length of data
uint32_t
keyLen
;
// length of the key
int8_t
removed
;
// flag to indicate removed
int8_t
count
;
// reference count
char
data
[];
}
SHashNode
;
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode));
typedef
enum
SHashLockTypeE
{
HASH_NO_LOCK
=
0
,
HASH_ENTRY_LOCK
=
1
,
...
...
@@ -115,7 +116,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
* @param dsize
* @return
*/
void
*
taosHashGetC
B
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
);
void
*
taosHashGetC
lone
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
);
/**
* remove item with the specified key
...
...
src/util/src/hash.c
浏览文件 @
ac319773
...
...
@@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
}
void
*
taosHashGet
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
return
taosHashGetC
B
(
pHashObj
,
key
,
keyLen
,
NULL
,
NULL
,
0
);
return
taosHashGetC
lone
(
pHashObj
,
key
,
keyLen
,
NULL
,
NULL
,
0
);
}
void
*
taosHashGetC
B
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
)
{
void
*
taosHashGetC
lone
(
SHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
(
*
fp
)(
void
*
),
void
*
d
,
size_t
dsize
)
{
if
(
pHashObj
->
size
<=
0
||
keyLen
==
0
||
key
==
NULL
)
{
return
NULL
;
}
...
...
@@ -654,7 +654,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode
->
keyLen
=
(
uint32_t
)
keyLen
;
pNewNode
->
hashVal
=
hashVal
;
pNewNode
->
dataLen
=
dsize
;
pNewNode
->
dataLen
=
(
uint32_t
)
dsize
;
pNewNode
->
count
=
1
;
memcpy
(
GET_HASH_NODE_DATA
(
pNewNode
),
pData
,
dsize
);
...
...
src/util/src/tcache.c
浏览文件 @
ac319773
...
...
@@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
}
SCacheDataNode
*
ptNode
=
NULL
;
taosHashGetC
B
(
pCacheObj
->
pHashTable
,
key
,
keyLen
,
incRefFn
,
&
ptNode
,
sizeof
(
void
*
));
taosHashGetC
lone
(
pCacheObj
->
pHashTable
,
key
,
keyLen
,
incRefFn
,
&
ptNode
,
sizeof
(
void
*
));
void
*
pData
=
(
ptNode
!=
NULL
)
?
ptNode
->
data
:
NULL
;
...
...
src/vnode/src/vnodeMgmt.c
浏览文件 @
ac319773
...
...
@@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) {
void
*
vnodeAcquire
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
NULL
;
if
(
tsVnodesHash
!=
NULL
)
{
ppVnode
=
taosHashGetC
B
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
ppVnode
=
taosHashGetC
lone
(
tsVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
}
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录