Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d5255f1e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d5255f1e
编写于
10月 13, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more format
上级
3950c8fc
变更
30
展开全部
隐藏空白更改
内联
并排
Showing
30 changed file
with
1237 addition
and
1200 deletion
+1237
-1200
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+382
-325
source/libs/catalog/inc/ctgRemote.h
source/libs/catalog/inc/ctgRemote.h
+0
-2
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+88
-83
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+1
-1
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+65
-61
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+179
-221
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+149
-166
source/libs/command/test/commandTest.cpp
source/libs/command/test/commandTest.cpp
+2
-2
source/libs/executor/inc/dataSinkInt.h
source/libs/executor/inc/dataSinkInt.h
+11
-9
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+4
-4
source/libs/executor/inc/tlinearhash.h
source/libs/executor/inc/tlinearhash.h
+5
-5
source/libs/executor/inc/tsimplehash.h
source/libs/executor/inc/tsimplehash.h
+1
-1
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+16
-14
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+29
-27
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+16
-15
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+40
-40
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+7
-9
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+10
-9
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+2
-1
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-2
source/libs/executor/src/tlinearhash.c
source/libs/executor/src/tlinearhash.c
+46
-48
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+88
-78
source/libs/executor/test/lhashTests.cpp
source/libs/executor/test/lhashTests.cpp
+11
-10
source/libs/executor/test/sortTests.cpp
source/libs/executor/test/sortTests.cpp
+40
-40
source/libs/executor/test/tSimpleHashTests.cpp
source/libs/executor/test/tSimpleHashTests.cpp
+3
-7
source/libs/tfs/inc/tfsInt.h
source/libs/tfs/inc/tfsInt.h
+16
-14
source/libs/tfs/src/tfs.c
source/libs/tfs/src/tfs.c
+2
-2
source/libs/tfs/test/tfsTest.cpp
source/libs/tfs/test/tfsTest.cpp
+2
-2
tools/scripts/codeFormat.sh
tools/scripts/codeFormat.sh
+19
-1
未找到文件。
source/libs/catalog/inc/catalogInt.h
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
source/libs/catalog/inc/ctgRemote.h
浏览文件 @
d5255f1e
...
...
@@ -20,8 +20,6 @@
extern
"C"
{
#endif
#ifdef __cplusplus
}
#endif
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
source/libs/catalog/src/ctgAsync.c
浏览文件 @
d5255f1e
...
...
@@ -636,7 +636,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
taosAcquireRef
(
gCtgMgmt
.
jobPool
,
pJob
->
refId
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
qDebug
(
"QID:0x%"
PRIx64
", jobId: 0x%"
PRIx64
" initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms"
,
pJob
->
queryId
,
pJob
->
refId
,
taskNum
,
pReq
->
forceUpdate
,
el
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
d5255f1e
...
...
@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
trpc
.h"
#include "
catalogInt
.h"
#include "query.h"
#include "tname.h"
#include "
catalogInt
.h"
#include "
trpc
.h"
extern
SCatalogMgmt
gCtgMgmt
;
SCtgDebug
gCTGDebug
=
{
0
};
SCtgDebug
gCTGDebug
=
{
0
};
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
taosMemoryFree
(
param
);
qDebug
(
"async call result: %s"
,
tstrerror
(
code
));
...
...
@@ -36,16 +36,19 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if
(
pResult
->
pTableMeta
&&
taosArrayGetSize
(
pResult
->
pTableMeta
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableMeta
*
p
=
*
(
STableMeta
**
)
taosArrayGet
(
pResult
->
pTableMeta
,
i
);
STableMeta
*
p
=
*
(
STableMeta
**
)
taosArrayGet
(
pResult
->
pTableMeta
,
i
);
STableComInfo
*
c
=
&
p
->
tableInfo
;
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
qDebug
(
"table meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
}
else
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
qDebug
(
"table meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
int32_t
colNum
=
c
->
numOfColumns
+
c
->
numOfTags
;
for
(
int32_t
j
=
0
;
j
<
colNum
;
++
j
)
{
SSchema
*
s
=
&
p
->
schema
[
j
];
...
...
@@ -59,11 +62,11 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if
(
pResult
->
pDbVgroup
&&
taosArrayGetSize
(
pResult
->
pDbVgroup
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pDbVgroup
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SArray
*
pDb
=
*
(
SArray
**
)
taosArrayGet
(
pResult
->
pDbVgroup
,
i
);
SArray
*
pDb
=
*
(
SArray
**
)
taosArrayGet
(
pResult
->
pDbVgroup
,
i
);
int32_t
vgNum
=
taosArrayGetSize
(
pDb
);
qDebug
(
"db %d vgInfo:"
,
i
);
for
(
int32_t
j
=
0
;
j
<
vgNum
;
++
j
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pDb
,
j
);
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pDb
,
j
);
qDebug
(
"vg :%d info: vgId:%d"
,
j
,
pInfo
->
vgId
);
}
}
...
...
@@ -84,7 +87,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if
(
pResult
->
pTableHash
&&
taosArrayGetSize
(
pResult
->
pTableHash
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pTableHash
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pTableHash
,
i
);
SVgroupInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pTableHash
,
i
);
qDebug
(
"table %d vg info: vgId:%d"
,
i
,
pInfo
->
vgId
);
}
}
else
{
...
...
@@ -94,7 +97,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if
(
pResult
->
pUdfList
&&
taosArrayGetSize
(
pResult
->
pUdfList
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pUdfList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SFuncInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pUdfList
,
i
);
SFuncInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pUdfList
,
i
);
qDebug
(
"udf %d info: name:%s, funcType:%d"
,
i
,
pInfo
->
name
,
pInfo
->
funcType
);
}
}
else
{
...
...
@@ -104,35 +107,34 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if
(
pResult
->
pDbCfg
&&
taosArrayGetSize
(
pResult
->
pDbCfg
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pDbCfg
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SDbCfgInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pDbCfg
,
i
);
SDbCfgInfo
*
pInfo
=
taosArrayGet
(
pResult
->
pDbCfg
,
i
);
qDebug
(
"db %d info: numOFVgroups:%d, numOfStables:%d"
,
i
,
pInfo
->
numOfVgroups
,
pInfo
->
numOfStables
);
}
}
else
{
qDebug
(
"empty db cfg info"
);
}
}
if
(
pResult
->
pUser
&&
taosArrayGetSize
(
pResult
->
pUser
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pUser
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
bool
*
auth
=
taosArrayGet
(
pResult
->
pUser
,
i
);
bool
*
auth
=
taosArrayGet
(
pResult
->
pUser
,
i
);
qDebug
(
"user auth %d info: %d"
,
i
,
*
auth
);
}
}
else
{
qDebug
(
"empty user auth info"
);
}
}
if
(
pResult
->
pQnodeList
&&
taosArrayGetSize
(
pResult
->
pQnodeList
)
>
0
)
{
num
=
taosArrayGetSize
(
pResult
->
pQnodeList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SQueryNodeAddr
*
qaddr
=
taosArrayGet
(
pResult
->
pQnodeList
,
i
);
SQueryNodeAddr
*
qaddr
=
taosArrayGet
(
pResult
->
pQnodeList
,
i
);
qDebug
(
"qnode %d info: id:%d"
,
i
,
qaddr
->
nodeId
);
}
}
else
{
qDebug
(
"empty qnode info"
);
}
}
}
/*
prepare SQL:
create database db1;
...
...
@@ -147,8 +149,8 @@ grant write on db2.* to user1;
create function udf1 as '/tmp/libudf1.so' outputtype int;
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
*/
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
uint64_t
reqId
,
bool
forceUpdate
)
{
int32_t
code
=
0
;
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
uint64_t
reqId
,
bool
forceUpdate
)
{
int32_t
code
=
0
;
SCatalogReq
req
=
{
0
};
req
.
pTableMeta
=
taosArrayInit
(
2
,
sizeof
(
SName
));
req
.
pDbVgroup
=
taosArrayInit
(
2
,
TSDB_DB_FNAME_LEN
);
...
...
@@ -156,16 +158,16 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
req
.
pTableHash
=
taosArrayInit
(
2
,
sizeof
(
SName
));
req
.
pUdf
=
taosArrayInit
(
2
,
TSDB_FUNC_NAME_LEN
);
req
.
pDbCfg
=
taosArrayInit
(
2
,
TSDB_DB_FNAME_LEN
);
req
.
pIndex
=
NULL
;
//
taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req
.
pIndex
=
NULL
;
//
taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req
.
pUser
=
taosArrayInit
(
2
,
sizeof
(
SUserAuthInfo
));
req
.
qNodeRequired
=
true
;
req
.
forceUpdate
=
forceUpdate
;
SName
name
=
{
0
};
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
funcName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
SName
name
=
{
0
};
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
char
funcName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
SUserAuthInfo
user
=
{
0
};
tNameFromString
(
&
name
,
"1.db1.tb1"
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
taosArrayPush
(
req
.
pTableMeta
,
&
name
);
taosArrayPush
(
req
.
pTableHash
,
&
name
);
...
...
@@ -207,7 +209,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
int32_t
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
));
*
param
=
1
;
int64_t
jobId
=
0
;
CTG_ERR_JRET
(
catalogAsyncGetAllMeta
(
pCtg
,
pConn
,
&
req
,
ctgdUserCallback
,
param
,
&
jobId
));
...
...
@@ -221,7 +223,7 @@ _return:
taosArrayDestroy
(
req
.
pDbCfg
);
taosArrayDestroy
(
req
.
pUser
);
CTG_RET
(
code
);
CTG_RET
(
code
);
}
int32_t
ctgdEnableDebug
(
char
*
option
)
{
...
...
@@ -250,7 +252,7 @@ int32_t ctgdEnableDebug(char *option) {
}
qError
(
"invalid debug option:%s"
,
option
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
...
...
@@ -261,7 +263,7 @@ int32_t ctgdGetStatNum(char *option, void *res) {
}
qError
(
"invalid stat option:%s"
,
option
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
...
...
@@ -287,7 +289,7 @@ int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
return
num
;
}
int32_t
ctgdGetClusterCacheNum
(
SCatalog
*
pCtg
,
int32_t
type
)
{
int32_t
ctgdGetClusterCacheNum
(
SCatalog
*
pCtg
,
int32_t
type
)
{
if
(
NULL
==
pCtg
||
NULL
==
pCtg
->
dbCache
)
{
return
0
;
}
...
...
@@ -304,8 +306,8 @@ int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
}
SCtgDBCache
*
dbCache
=
NULL
;
int32_t
num
=
0
;
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
int32_t
num
=
0
;
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
dbCache
=
(
SCtgDBCache
*
)
pIter
;
switch
(
type
)
{
...
...
@@ -325,7 +327,7 @@ int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
return
num
;
}
void
ctgdShowTableMeta
(
SCatalog
*
pCtg
,
const
char
*
tbName
,
STableMeta
*
p
)
{
void
ctgdShowTableMeta
(
SCatalog
*
pCtg
,
const
char
*
tbName
,
STableMeta
*
p
)
{
if
(
!
gCTGDebug
.
metaEnable
)
{
return
;
}
...
...
@@ -333,11 +335,14 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
STableComInfo
*
c
=
&
p
->
tableInfo
;
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
return
;
}
else
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:0x%"
PRIx64
",suid:0x%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
int32_t
colNum
=
c
->
numOfColumns
+
c
->
numOfTags
;
...
...
@@ -347,18 +352,18 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
}
}
void
ctgdShowDBCache
(
SCatalog
*
pCtg
,
SHashObj
*
dbHash
)
{
void
ctgdShowDBCache
(
SCatalog
*
pCtg
,
SHashObj
*
dbHash
)
{
if
(
NULL
==
dbHash
||
!
gCTGDebug
.
cacheEnable
)
{
return
;
}
int32_t
i
=
0
;
int32_t
i
=
0
;
SCtgDBCache
*
dbCache
=
NULL
;
void
*
pIter
=
taosHashIterate
(
dbHash
,
NULL
);
void
*
pIter
=
taosHashIterate
(
dbHash
,
NULL
);
while
(
pIter
)
{
char
*
dbFName
=
NULL
;
char
*
dbFName
=
NULL
;
size_t
len
=
0
;
dbCache
=
(
SCtgDBCache
*
)
pIter
;
dbFName
=
taosHashGetKey
(
pIter
,
&
len
);
...
...
@@ -380,29 +385,29 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
vgNum
=
taosHashGetSize
(
dbCache
->
vgCache
.
vgInfo
->
vgHash
);
}
}
ctgDebug
(
"[%d] db [%.*s][0x%"
PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d"
,
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
,
dbCache
->
deleted
?
"deleted"
:
""
,
metaNum
,
stbNum
,
vgVersion
,
hashMethod
,
hashPrefix
,
hashSuffix
,
vgNum
);
ctgDebug
(
"[%d] db [%.*s][0x%"
PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d"
,
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
,
dbCache
->
deleted
?
"deleted"
:
""
,
metaNum
,
stbNum
,
vgVersion
,
hashMethod
,
hashPrefix
,
hashSuffix
,
vgNum
);
pIter
=
taosHashIterate
(
dbHash
,
pIter
);
}
}
void
ctgdShowClusterCache
(
SCatalog
*
pCtg
)
{
void
ctgdShowClusterCache
(
SCatalog
*
pCtg
)
{
if
(
!
gCTGDebug
.
cacheEnable
||
NULL
==
pCtg
)
{
return
;
}
ctgDebug
(
"## cluster 0x%"
PRIx64
" %p cache Info BEGIN ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_RENT_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_RENT_NUM
));
ctgDebug
(
"## cluster 0x%"
PRIx64
" %p cache Info BEGIN ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_RENT_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_RENT_NUM
));
ctgdShowDBCache
(
pCtg
,
pCtg
->
dbCache
);
ctgDebug
(
"## cluster 0x%"
PRIx64
" %p cache Info END ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"## cluster 0x%"
PRIx64
" %p cache Info END ##"
,
pCtg
->
clusterId
,
pCtg
);
}
int32_t
ctgdShowCacheInfo
(
void
)
{
...
...
@@ -413,19 +418,18 @@ int32_t ctgdShowCacheInfo(void) {
CTG_API_ENTER
();
qDebug
(
"# total catalog cluster number %d #"
,
taosHashGetSize
(
gCtgMgmt
.
pCluster
));
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
if
(
pCtg
)
{
ctgdShowClusterCache
(
pCtg
);
}
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
source/libs/catalog/src/ctgUtil.c
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
source/libs/command/test/commandTest.cpp
浏览文件 @
d5255f1e
...
...
@@ -16,6 +16,6 @@
#include <gtest/gtest.h>
int
main
(
int
argc
,
char
*
argv
[])
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/executor/inc/dataSinkInt.h
浏览文件 @
d5255f1e
...
...
@@ -20,16 +20,16 @@
extern
"C"
{
#endif
#include "tcommon.h"
#include "dataSinkMgt.h"
#include "plannodes.h"
#include "tcommon.h"
struct
SDataSink
;
struct
SDataSinkHandle
;
typedef
struct
SDataSinkManager
{
SDataSinkMgtCfg
cfg
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
}
SDataSinkManager
;
typedef
int32_t
(
*
FPutDataBlock
)(
struct
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
);
...
...
@@ -40,17 +40,19 @@ typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef
int32_t
(
*
FGetCacheSize
)(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
*
size
);
typedef
struct
SDataSinkHandle
{
FPutDataBlock
fPut
;
FEndPut
fEndPut
;
FGetDataLength
fGetLen
;
FGetDataBlock
fGetData
;
FPutDataBlock
fPut
;
FEndPut
fEndPut
;
FGetDataLength
fGetLen
;
FGetDataBlock
fGetData
;
FDestroyDataSinker
fDestroy
;
FGetCacheSize
fGetCacheSize
;
FGetCacheSize
fGetCacheSize
;
}
SDataSinkHandle
;
int32_t
createDataDispatcher
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
);
int32_t
createDataDeleter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
);
int32_t
createDataInserter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
);
int32_t
createDataDeleter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
);
int32_t
createDataInserter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
);
#ifdef __cplusplus
}
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
d5255f1e
...
...
@@ -23,10 +23,10 @@ extern "C" {
extern
int32_t
exchangeObjRefPool
;
typedef
struct
{
char
*
pData
;
bool
isNull
;
int16_t
type
;
int32_t
bytes
;
char
*
pData
;
bool
isNull
;
int16_t
type
;
int32_t
bytes
;
}
SGroupKeys
,
SStateKeys
;
uint64_t
calcGroupId
(
char
*
pData
,
int32_t
len
);
...
...
source/libs/executor/inc/tlinearhash.h
浏览文件 @
d5255f1e
...
...
@@ -24,7 +24,7 @@ extern "C" {
enum
{
LINEAR_HASH_STATIS
=
0x1
,
LINEAR_HASH_DATA
=
0x2
,
LINEAR_HASH_DATA
=
0x2
,
};
typedef
struct
SLHashObj
SLHashObj
;
...
...
@@ -32,11 +32,11 @@ typedef struct SLHashObj SLHashObj;
SLHashObj
*
tHashInit
(
int32_t
inMemPages
,
int32_t
pageSize
,
_hash_fn_t
fn
,
int32_t
numOfTuplePerPage
);
void
*
tHashCleanup
(
SLHashObj
*
pHashObj
);
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
);
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
);
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
);
void
tHashPrint
(
const
SLHashObj
*
pHashObj
,
int32_t
type
);
void
tHashPrint
(
const
SLHashObj
*
pHashObj
,
int32_t
type
);
#ifdef __cplusplus
}
...
...
source/libs/executor/inc/tsimplehash.h
浏览文件 @
d5255f1e
...
...
@@ -112,7 +112,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj);
size_t
tSimpleHashGetMemSize
(
const
SSHashObj
*
pHashObj
);
#pragma pack(push, 4)
typedef
struct
SHNode
{
typedef
struct
SHNode
{
struct
SHNode
*
next
;
uint32_t
keyLen
:
20
;
uint32_t
dataLen
:
12
;
...
...
source/libs/executor/inc/tsort.h
浏览文件 @
d5255f1e
...
...
@@ -20,8 +20,8 @@
extern
"C"
{
#endif
#include "tcommon.h"
#include "os.h"
#include "tcommon.h"
enum
{
SORT_MULTISOURCE_MERGE
=
0x1
,
...
...
@@ -31,29 +31,29 @@ enum {
typedef
struct
SMultiMergeSource
{
int32_t
type
;
int32_t
rowIndex
;
SSDataBlock
*
pBlock
;
SSDataBlock
*
pBlock
;
}
SMultiMergeSource
;
typedef
struct
SSortSource
{
SMultiMergeSource
src
;
union
{
struct
{
SArray
*
pageIdList
;
int32_t
pageIndex
;
union
{
struct
{
SArray
*
pageIdList
;
int32_t
pageIndex
;
};
void
*
param
;
void
*
param
;
};
}
SSortSource
;
typedef
struct
SMsortComparParam
{
void
**
pSources
;
int32_t
numOfSources
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
cmpGroupId
;
void
**
pSources
;
int32_t
numOfSources
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
bool
cmpGroupId
;
}
SMsortComparParam
;
typedef
struct
SSortHandle
SSortHandle
;
typedef
struct
SSortHandle
SSortHandle
;
typedef
struct
STupleHandle
STupleHandle
;
typedef
SSDataBlock
*
(
*
_sort_fetch_block_fn_t
)(
void
*
param
);
...
...
@@ -64,7 +64,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type
* @return
*/
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
pOrderInfo
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSDataBlock
*
pBlock
,
const
char
*
idstr
);
SSortHandle
*
tsortCreateSortHandle
(
SArray
*
pOrderInfo
,
int32_t
type
,
int32_t
pageSize
,
int32_t
numOfPages
,
SSDataBlock
*
pBlock
,
const
char
*
idstr
);
/**
*
...
...
@@ -90,7 +91,8 @@ int32_t tsortClose(SSortHandle* pHandle);
*
* @return
*/
int32_t
tsortSetFetchRawDataFp
(
SSortHandle
*
pHandle
,
_sort_fetch_block_fn_t
fetchFp
,
void
(
*
fp
)(
SSDataBlock
*
,
void
*
),
void
*
param
);
int32_t
tsortSetFetchRawDataFp
(
SSortHandle
*
pHandle
,
_sort_fetch_block_fn_t
fetchFp
,
void
(
*
fp
)(
SSDataBlock
*
,
void
*
),
void
*
param
);
/**
*
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
d5255f1e
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tdatablock.h"
...
...
@@ -26,8 +26,8 @@
#include "ttypes.h"
static
SSDataBlock
*
doScanCache
(
SOperatorInfo
*
pOperator
);
static
void
destroyLastrowScanOperator
(
void
*
param
);
static
int32_t
extractTargetSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
static
void
destroyLastrowScanOperator
(
void
*
param
);
static
int32_t
extractTargetSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -40,11 +40,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
}
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
pRes
=
createResDataBlock
(
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
);
pInfo
->
pRes
=
createResDataBlock
(
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanNode
->
scan
.
pScanCols
,
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanNode
->
scan
.
pScanCols
,
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
code
=
extractTargetSlotId
(
pInfo
->
pColMatchInfo
,
pTaskInfo
,
&
pInfo
->
pSlotIds
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -58,7 +58,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname
if
(
taosArrayGetSize
(
pTableList
->
pGroupList
)
==
taosArrayGetSize
(
pTableList
->
pTableList
))
{
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_ALL
|
CACHESCAN_RETRIEVE_LAST_ROW
;
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_ALL
|
CACHESCAN_RETRIEVE_LAST_ROW
;
code
=
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pTableList
->
pTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -67,23 +67,24 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo
->
pBufferredRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pBufferredRes
,
pOperator
->
resultInfo
.
capacity
);
}
else
{
// by tags
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_SINGLE
|
CACHESCAN_RETRIEVE_LAST_ROW
;
}
else
{
// by tags
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_SINGLE
|
CACHESCAN_RETRIEVE_LAST_ROW
;
}
if
(
pScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
SExprSupp
*
pPseudoExpr
=
&
pInfo
->
pseudoExprSup
;
pPseudoExpr
->
pExprInfo
=
createExprInfo
(
pScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pPseudoExpr
->
numOfExprs
);
pPseudoExpr
->
pCtx
=
createSqlFunctionCtx
(
pPseudoExpr
->
pExprInfo
,
pPseudoExpr
->
numOfExprs
,
&
pPseudoExpr
->
rowEntryInfoOffset
);
pPseudoExpr
->
pCtx
=
createSqlFunctionCtx
(
pPseudoExpr
->
pExprInfo
,
pPseudoExpr
->
numOfExprs
,
&
pPseudoExpr
->
rowEntryInfoOffset
);
}
pOperator
->
name
=
"LastrowScanOperator"
;
pOperator
->
name
=
"LastrowScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
...
...
@@ -92,7 +93,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
code
;
destroyLastrowScanOperator
(
pInfo
);
taosMemoryFree
(
pOperator
);
...
...
@@ -121,7 +122,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup
(
pInfo
->
pBufferredRes
);
taosArrayClear
(
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieveCacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pBufferredRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieveCacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pBufferredRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -133,15 +135,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
if
(
pInfo
->
indexOfBufferedRes
<
pInfo
->
pBufferredRes
->
info
.
rows
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
++
i
)
{
SColMatchInfo
*
pMatchInfo
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
int32_t
slotId
=
pMatchInfo
->
targetSlotId
;
int32_t
slotId
=
pMatchInfo
->
targetSlotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pInfo
->
pBufferredRes
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
slotId
);
char
*
p
=
colDataGetData
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
colDataAppend
(
pDst
,
0
,
p
,
isNull
);
}
...
...
@@ -150,8 +152,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if
(
pInfo
->
pseudoExprSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pInfo
->
pseudoExprSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -180,7 +182,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SArray
*
pGroupTableList
=
taosArrayGetP
(
pTableList
->
pGroupList
,
pInfo
->
currentGroupIndex
);
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pGroupTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
taosArrayClear
(
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieveCacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
...
...
@@ -200,8 +202,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo
->
pRes
->
info
.
groupId
=
pKeyInfo
->
groupId
;
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
...
...
@@ -233,10 +235,10 @@ void destroyLastrowScanOperator(void* param) {
}
int32_t
extractTargetSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
)
{
size_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
size_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
*
pSlotIds
=
taosMemoryMalloc
(
numOfCols
*
sizeof
(
int32_t
));
if
(
*
pSlotIds
==
NULL
)
{
if
(
*
pSlotIds
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
d5255f1e
...
...
@@ -93,8 +93,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
blockEncode
(
pInput
->
pData
,
pEntry
->
data
,
&
pEntry
->
dataLen
,
numOfCols
,
pEntry
->
compressed
);
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
pBuf
->
useSize
+=
pEntry
->
dataLen
;
...
...
@@ -103,14 +103,14 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
}
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
/*
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDispatcher->pDataBlocks));
return false;
}
*/
/*
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDispatcher->pDataBlocks));
return false;
}
*/
pBuf
->
allocSize
=
sizeof
(
SDataCacheEntry
)
+
blockGetEncodeSize
(
pInput
->
pData
);
...
...
@@ -176,11 +176,12 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pDispatcher
->
nextOutput
.
pData
;
*
pLen
=
pEntry
->
dataLen
;
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
*
pQueryEnd
=
pDispatcher
->
queryEnd
;
qDebug
(
"got data len %"
PRId64
", row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
numOfRows
);
qDebug
(
"got data len %"
PRId64
", row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
numOfRows
);
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
...
...
@@ -199,8 +200,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
atomic_sub_fetch_64
(
&
pDispatcher
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
d5255f1e
...
...
@@ -27,7 +27,7 @@ extern SDataSinkStat gDataSinkStat;
typedef
struct
SSubmitRes
{
int64_t
affectedRows
;
int32_t
code
;
SSubmitRsp
*
pRsp
;
SSubmitRsp
*
pRsp
;
}
SSubmitRes
;
typedef
struct
SDataInserterHandle
{
...
...
@@ -44,7 +44,7 @@ typedef struct SDataInserterHandle {
uint64_t
useconds
;
uint64_t
cachedSize
;
TdThreadMutex
mutex
;
tsem_t
ready
;
tsem_t
ready
;
}
SDataInserterHandle
;
typedef
struct
SSubmitRspParam
{
...
...
@@ -52,14 +52,14 @@ typedef struct SSubmitRspParam {
}
SSubmitRspParam
;
int32_t
inserterCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSubmitRspParam
*
pParam
=
(
SSubmitRspParam
*
)
param
;
SSubmitRspParam
*
pParam
=
(
SSubmitRspParam
*
)
param
;
SDataInserterHandle
*
pInserter
=
pParam
->
pInserter
;
pInserter
->
submitRes
.
code
=
code
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pInserter
->
submitRes
.
pRsp
=
taosMemoryCalloc
(
1
,
sizeof
(
SSubmitRsp
));
SDecoder
coder
=
{
0
};
SDecoder
coder
=
{
0
};
tDecoderInit
(
&
coder
,
pMsg
->
pData
,
pMsg
->
len
);
code
=
tDecodeSSubmitRsp
(
&
coder
,
pInserter
->
submitRes
.
pRsp
);
if
(
code
)
{
...
...
@@ -67,10 +67,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
pInserter
->
submitRes
.
code
=
code
;
goto
_return
;
}
if
(
pInserter
->
submitRes
.
pRsp
->
nBlocks
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
pInserter
->
submitRes
.
pRsp
->
nBlocks
;
++
i
)
{
SSubmitBlkRsp
*
blk
=
pInserter
->
submitRes
.
pRsp
->
pBlocks
+
i
;
SSubmitBlkRsp
*
blk
=
pInserter
->
submitRes
.
pRsp
->
pBlocks
+
i
;
if
(
TSDB_CODE_SUCCESS
!=
blk
->
code
)
{
code
=
blk
->
code
;
tFreeSSubmitRsp
(
pInserter
->
submitRes
.
pRsp
);
...
...
@@ -79,9 +79,10 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
}
}
pInserter
->
submitRes
.
affectedRows
+=
pInserter
->
submitRes
.
pRsp
->
affectedRows
;
qDebug
(
"submit rsp received, affectedRows:%d, total:%d"
,
pInserter
->
submitRes
.
pRsp
->
affectedRows
,
pInserter
->
submitRes
.
affectedRows
);
qDebug
(
"submit rsp received, affectedRows:%d, total:%d"
,
pInserter
->
submitRes
.
pRsp
->
affectedRows
,
pInserter
->
submitRes
.
affectedRows
);
tFreeSSubmitRsp
(
pInserter
->
submitRes
.
pRsp
);
}
...
...
@@ -91,11 +92,10 @@ _return:
tsem_post
(
&
pInserter
->
ready
);
taosMemoryFree
(
pMsg
->
pData
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sendSubmitRequest
(
SDataInserterHandle
*
pInserter
,
SSubmitReq
*
pMsg
,
void
*
pTransporter
,
SEpSet
*
pEpset
)
{
// send the fetch remote task result reques
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
...
...
@@ -109,7 +109,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
pParam
->
pInserter
=
pInserter
;
pMsgSendInfo
->
param
=
pParam
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
msgInfo
.
pData
=
pMsg
;
pMsgSendInfo
->
msgInfo
.
len
=
ntohl
(
pMsg
->
length
);
pMsgSendInfo
->
msgType
=
TDMT_VND_SUBMIT
;
...
...
@@ -119,17 +119,16 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
return
asyncSendMsgToServer
(
pTransporter
,
pEpset
,
&
transporterId
,
pMsgSendInfo
);
}
int32_t
dataBlockToSubmit
(
SDataInserterHandle
*
pInserter
,
SSubmitReq
**
pReq
)
{
const
SArray
*
pBlocks
=
pInserter
->
pDataBlocks
;
const
STSchema
*
pTSchema
=
pInserter
->
pSchema
;
int64_t
uid
=
pInserter
->
pNode
->
tableId
;
int64_t
suid
=
pInserter
->
pNode
->
stableId
;
int32_t
vgId
=
pInserter
->
pNode
->
vgId
;
bool
fullCol
=
(
pInserter
->
pNode
->
pCols
->
length
==
pTSchema
->
numOfCols
);
const
SArray
*
pBlocks
=
pInserter
->
pDataBlocks
;
const
STSchema
*
pTSchema
=
pInserter
->
pSchema
;
int64_t
uid
=
pInserter
->
pNode
->
tableId
;
int64_t
suid
=
pInserter
->
pNode
->
stableId
;
int32_t
vgId
=
pInserter
->
pNode
->
vgId
;
bool
fullCol
=
(
pInserter
->
pNode
->
pCols
->
length
==
pTSchema
->
numOfCols
);
SSubmitReq
*
ret
=
NULL
;
int32_t
sz
=
taosArrayGetSize
(
pBlocks
);
int32_t
sz
=
taosArrayGetSize
(
pBlocks
);
// cal size
int32_t
cap
=
sizeof
(
SSubmitReq
);
...
...
@@ -164,7 +163,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
int32_t
rows
=
0
;
int32_t
dataLen
=
0
;
STSRow
*
rowData
=
POINTER_SHIFT
(
blkHead
,
sizeof
(
SSubmitBlk
));
int64_t
lastTs
=
TSKEY_MIN
;
int64_t
lastTs
=
TSKEY_MIN
;
bool
ignoreRow
=
false
;
for
(
int32_t
j
=
0
;
j
<
pDataBlock
->
info
.
rows
;
j
++
)
{
SRowBuilder
rb
=
{
0
};
...
...
@@ -176,13 +175,13 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
for
(
int32_t
k
=
0
;
k
<
pTSchema
->
numOfCols
;
k
++
)
{
const
STColumn
*
pColumn
=
&
pTSchema
->
columns
[
k
];
SColumnInfoData
*
pColData
=
NULL
;
int16_t
colIdx
=
k
;
int16_t
colIdx
=
k
;
if
(
!
fullCol
)
{
int16_t
*
slotId
=
taosHashGet
(
pInserter
->
pCols
,
&
pColumn
->
colId
,
sizeof
(
pColumn
->
colId
));
int16_t
*
slotId
=
taosHashGet
(
pInserter
->
pCols
,
&
pColumn
->
colId
,
sizeof
(
pColumn
->
colId
));
if
(
NULL
==
slotId
)
{
continue
;
}
colIdx
=
*
slotId
;
}
...
...
@@ -192,13 +191,13 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
terrno
=
TSDB_CODE_APP_ERROR
;
return
TSDB_CODE_APP_ERROR
;
}
if
(
colDataIsNull_s
(
pColData
,
j
))
{
if
(
0
==
k
&&
TSDB_DATA_TYPE_TIMESTAMP
==
pColumn
->
type
)
{
ignoreRow
=
true
;
break
;
}
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
pColumn
->
offset
,
k
);
}
else
{
void
*
data
=
colDataGetData
(
pColData
,
j
);
...
...
@@ -213,7 +212,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
pColumn
->
offset
,
k
);
}
}
if
(
!
fullCol
)
{
if
(
!
fullCol
)
{
rb
.
hasNone
=
true
;
}
tdSRowEnd
(
&
rb
);
...
...
@@ -221,13 +220,13 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
if
(
ignoreRow
)
{
continue
;
}
rows
++
;
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
dataLen
+=
rowLen
;
}
blkHead
->
dataLen
=
htonl
(
dataLen
);
blkHead
->
numOfRows
=
htonl
(
rows
);
...
...
@@ -242,12 +241,11 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataInserterHandle
*
pInserter
=
(
SDataInserterHandle
*
)
pHandle
;
taosArrayPush
(
pInserter
->
pDataBlocks
,
&
pInput
->
pData
);
SSubmitReq
*
pMsg
=
NULL
;
int32_t
code
=
dataBlockToSubmit
(
pInserter
,
&
pMsg
);
int32_t
code
=
dataBlockToSubmit
(
pInserter
,
&
pMsg
);
if
(
code
)
{
return
code
;
}
...
...
@@ -264,7 +262,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
}
*
pContinue
=
true
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -279,10 +277,9 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
static
void
getDataLength
(
SDataSinkHandle
*
pHandle
,
int64_t
*
pLen
,
bool
*
pQueryEnd
)
{
SDataInserterHandle
*
pDispatcher
=
(
SDataInserterHandle
*
)
pHandle
;
*
pLen
=
pDispatcher
->
submitRes
.
affectedRows
;
qDebug
(
"got total affectedRows %"
PRId64
,
*
pLen
);
qDebug
(
"got total affectedRows %"
PRId64
,
*
pLen
);
}
static
int32_t
destroyDataSinker
(
SDataSinkHandle
*
pHandle
)
{
SDataInserterHandle
*
pInserter
=
(
SDataInserterHandle
*
)
pHandle
;
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pInserter
->
cachedSize
);
...
...
@@ -301,14 +298,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDataInserter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
int32_t
createDataInserter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
SDataInserterHandle
*
inserter
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataInserterHandle
));
if
(
NULL
==
inserter
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
SQueryInserterNode
*
pInserterNode
=
(
SQueryInserterNode
*
)
pDataSink
;
SQueryInserterNode
*
pInserterNode
=
(
SQueryInserterNode
*
)
pDataSink
;
inserter
->
sink
.
fPut
=
putDataBlock
;
inserter
->
sink
.
fEndPut
=
endPut
;
inserter
->
sink
.
fGetLen
=
getDataLength
;
...
...
@@ -322,7 +320,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter
->
queryEnd
=
false
;
int64_t
suid
=
0
;
int32_t
code
=
tsdbGetTableSchema
(
inserter
->
pParam
->
readHandle
->
vnode
,
pInserterNode
->
tableId
,
&
inserter
->
pSchema
,
&
suid
);
int32_t
code
=
tsdbGetTableSchema
(
inserter
->
pParam
->
readHandle
->
vnode
,
pInserterNode
->
tableId
,
&
inserter
->
pSchema
,
&
suid
);
if
(
code
)
{
return
code
;
}
...
...
@@ -339,15 +338,16 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
inserter
->
pCols
=
taosHashInit
(
pInserterNode
->
pCols
->
length
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_SMALLINT
),
false
,
HASH_NO_LOCK
);
inserter
->
pCols
=
taosHashInit
(
pInserterNode
->
pCols
->
length
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_SMALLINT
),
false
,
HASH_NO_LOCK
);
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pInserterNode
->
pCols
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
taosHashPut
(
inserter
->
pCols
,
&
pCol
->
colId
,
sizeof
(
pCol
->
colId
),
&
pCol
->
slotId
,
sizeof
(
pCol
->
slotId
));
}
tsem_init
(
&
inserter
->
ready
,
0
,
0
);
*
pHandle
=
inserter
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
d5255f1e
...
...
@@ -13,28 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tarray.h"
#include "dataSinkMgt.h"
#include "dataSinkInt.h"
#include "planner.h"
#include "tarray.h"
static
SDataSinkManager
gDataSinkManager
=
{
0
};
SDataSinkStat
gDataSinkStat
=
{
0
};
SDataSinkStat
gDataSinkStat
=
{
0
};
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
)
{
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
)
{
gDataSinkManager
.
cfg
=
*
cfg
;
taosThreadMutexInit
(
&
gDataSinkManager
.
mutex
,
NULL
);
return
0
;
// to avoid compiler eror
return
0
;
// to avoid compiler eror
}
int32_t
dsDataSinkGetCacheSize
(
SDataSinkStat
*
pStat
)
{
int32_t
dsDataSinkGetCacheSize
(
SDataSinkStat
*
pStat
)
{
pStat
->
cachedSize
=
atomic_load_64
(
&
gDataSinkStat
.
cachedSize
);
return
0
;
}
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
switch
((
int
)
nodeType
(
pDataSink
))
{
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
createDataDispatcher
(
&
gDataSinkManager
,
pDataSink
,
pHandle
);
...
...
@@ -66,12 +65,11 @@ int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
return
pHandleImpl
->
fGetData
(
pHandleImpl
,
pOutput
);
}
int32_t
dsGetCacheSize
(
DataSinkHandle
handle
,
uint64_t
*
pSize
)
{
int32_t
dsGetCacheSize
(
DataSinkHandle
handle
,
uint64_t
*
pSize
)
{
SDataSinkHandle
*
pHandleImpl
=
(
SDataSinkHandle
*
)
handle
;
return
pHandleImpl
->
fGetCacheSize
(
pHandleImpl
,
pSize
);
}
void
dsScheduleProcess
(
void
*
ahandle
,
void
*
pItem
)
{
// todo
}
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
d5255f1e
...
...
@@ -267,18 +267,19 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
size_t
rightNumJoin
=
taosArrayGetSize
(
rightRowLocations
);
code
=
blockDataEnsureCapacity
(
pRes
,
*
nRows
+
leftNumJoin
*
rightNumJoin
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s can not ensure block capacity for join. left: %zu, right: %zu"
,
GET_TASKID
(
pOperator
->
pTaskInfo
),
leftNumJoin
,
rightNumJoin
);
qError
(
"%s can not ensure block capacity for join. left: %zu, right: %zu"
,
GET_TASKID
(
pOperator
->
pTaskInfo
),
leftNumJoin
,
rightNumJoin
);
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
for
(
int32_t
i
=
0
;
i
<
leftNumJoin
;
++
i
)
{
for
(
int32_t
j
=
0
;
j
<
rightNumJoin
;
++
j
)
{
SRowLocation
*
leftRow
=
taosArrayGet
(
leftRowLocations
,
i
);
SRowLocation
*
rightRow
=
taosArrayGet
(
rightRowLocations
,
j
);
mergeJoinJoinLeftRight
(
pOperator
,
pRes
,
*
nRows
,
leftRow
->
pDataBlock
,
leftRow
->
pos
,
rightRow
->
pDataBlock
,
rightRow
->
pos
);
++*
nRows
;
}
for
(
int32_t
i
=
0
;
i
<
leftNumJoin
;
++
i
)
{
for
(
int32_t
j
=
0
;
j
<
rightNumJoin
;
++
j
)
{
SRowLocation
*
leftRow
=
taosArrayGet
(
leftRowLocations
,
i
);
SRowLocation
*
rightRow
=
taosArrayGet
(
rightRowLocations
,
j
);
mergeJoinJoinLeftRight
(
pOperator
,
pRes
,
*
nRows
,
leftRow
->
pDataBlock
,
leftRow
->
pos
,
rightRow
->
pDataBlock
,
rightRow
->
pos
);
++*
nRows
;
}
}
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
rightCreatedBlocks
);
++
i
)
{
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
d5255f1e
...
...
@@ -658,7 +658,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataDestroy
(
p
);
qDebug
(
"%s get sorted block, groupId:%0x"
PRIx64
" rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pDataBlock
->
info
.
groupId
,
pDataBlock
->
info
.
rows
);
qDebug
(
"%s get sorted block, groupId:%0x"
PRIx64
" rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pDataBlock
->
info
.
groupId
,
pDataBlock
->
info
.
rows
);
return
(
pDataBlock
->
info
.
rows
>
0
)
?
pDataBlock
:
NULL
;
}
...
...
source/libs/executor/src/tfill.c
浏览文件 @
d5255f1e
...
...
@@ -561,7 +561,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
int32_t
numOfRows
=
taosNumOfRemainRows
(
pFillInfo
);
TSKEY
ekey1
=
ekey
;
int64_t
numOfRes
=
-
1
;
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
];
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
d5255f1e
...
...
@@ -1218,7 +1218,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pBlock
->
info
.
rows
,
numOfOutput
);
}
static
int32_t
openStateWindowAggOptr
(
SOperatorInfo
*
pOperator
)
{
static
int32_t
openStateWindowAggOptr
(
SOperatorInfo
*
pOperator
)
{
if
(
OPTR_IS_OPENED
(
pOperator
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2757,7 +2757,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
if
(
pStateNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalarExpr
=
0
;
SExprInfo
*
pScalarExprInfo
=
createExprInfo
(
pStateNode
->
window
.
pExprs
,
NULL
,
&
numOfScalarExpr
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pScalarExprInfo
,
numOfScalarExpr
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pScalarExprInfo
,
numOfScalarExpr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
source/libs/executor/src/tlinearhash.c
浏览文件 @
d5255f1e
...
...
@@ -14,27 +14,27 @@
*/
#include "tlinearhash.h"
#include "tdef.h"
#include "taoserror.h"
#include "tdef.h"
#include "tpagedbuf.h"
#define LHASH_CAP_RATIO 0.85
// Always located in memory
typedef
struct
SLHashBucket
{
SArray
*
pPageIdList
;
int32_t
size
;
// the number of element in this entry
SArray
*
pPageIdList
;
int32_t
size
;
// the number of element in this entry
}
SLHashBucket
;
struct
SLHashObj
{
SDiskbasedBuf
*
pBuf
;
SDiskbasedBuf
*
pBuf
;
_hash_fn_t
hashFn
;
SLHashBucket
**
pBucket
;
// entry list
SLHashBucket
**
pBucket
;
// entry list
int32_t
tuplesPerPage
;
int32_t
numOfAlloc
;
// number of allocated bucket ptr slot
int32_t
bits
;
// the number of bits used in hash
int32_t
numOfBuckets
;
// the number of buckets
int64_t
size
;
// the number of total items
int32_t
numOfAlloc
;
// number of allocated bucket ptr slot
int32_t
bits
;
// the number of bits used in hash
int32_t
numOfBuckets
;
// the number of buckets
int64_t
size
;
// the number of total items
};
/**
...
...
@@ -44,19 +44,17 @@ struct SLHashObj {
* +-----------+-------+--------+
*/
typedef
struct
SLHashNode
{
uint16_t
keyLen
;
uint16_t
dataLen
;
uint16_t
keyLen
;
uint16_t
dataLen
;
}
SLHashNode
;
#define GET_LHASH_NODE_KEY(_n)
(((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n)
((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
#define GET_LHASH_NODE_LEN(_n)
(sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
static
int32_t
doAddNewBucket
(
SLHashObj
*
pHashObj
);
static
int32_t
doGetBucketIdFromHashVal
(
int32_t
hashv
,
int32_t
bits
)
{
return
hashv
&
((
1ul
<<
(
bits
))
-
1
);
}
static
int32_t
doGetBucketIdFromHashVal
(
int32_t
hashv
,
int32_t
bits
)
{
return
hashv
&
((
1ul
<<
(
bits
))
-
1
);
}
static
int32_t
doGetAlternativeBucketId
(
int32_t
bucketId
,
int32_t
bits
,
int32_t
numOfBuckets
)
{
int32_t
v
=
bucketId
-
(
1ul
<<
(
bits
-
1
));
...
...
@@ -70,9 +68,9 @@ static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) {
}
static
void
doCopyObject
(
char
*
p
,
const
void
*
key
,
int32_t
keyLen
,
const
void
*
data
,
int32_t
size
)
{
*
(
uint16_t
*
)
p
=
keyLen
;
*
(
uint16_t
*
)
p
=
keyLen
;
p
+=
sizeof
(
uint16_t
);
*
(
uint16_t
*
)
p
=
size
;
*
(
uint16_t
*
)
p
=
size
;
p
+=
sizeof
(
uint16_t
);
memcpy
(
p
,
key
,
keyLen
);
...
...
@@ -86,7 +84,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGetLast
(
pBucket
->
pPageIdList
);
SFilePage
*
pPage
=
getBufPage
(
pHashObj
->
pBuf
,
pageId
);
ASSERT
(
pPage
!=
NULL
);
ASSERT
(
pPage
!=
NULL
);
// put to current buf page
size_t
nodeSize
=
sizeof
(
SLHashNode
)
+
keyLen
+
size
;
...
...
@@ -96,7 +94,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
releaseBufPage
(
pHashObj
->
pBuf
,
pPage
);
// allocate the overflow buffer page to hold this k/v.
int32_t
newPageId
=
-
1
;
int32_t
newPageId
=
-
1
;
SFilePage
*
pNewPage
=
getNewBufPage
(
pHashObj
->
pBuf
,
&
newPageId
);
if
(
pNewPage
==
NULL
)
{
return
terrno
;
...
...
@@ -110,7 +108,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
setBufPageDirty
(
pNewPage
,
true
);
releaseBufPage
(
pHashObj
->
pBuf
,
pNewPage
);
}
else
{
char
*
p
=
(
char
*
)
pPage
+
pPage
->
num
;
char
*
p
=
(
char
*
)
pPage
+
pPage
->
num
;
doCopyObject
(
p
,
key
,
keyLen
,
data
,
size
);
pPage
->
num
+=
nodeSize
;
setBufPageDirty
(
pPage
,
true
);
...
...
@@ -118,7 +116,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
}
pBucket
->
size
+=
1
;
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -127,7 +125,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
ASSERT
(
pPage
!=
NULL
&&
pNode
!=
NULL
&&
pBucket
->
size
>=
1
);
int32_t
len
=
GET_LHASH_NODE_LEN
(
pNode
);
char
*
p
=
(
char
*
)
pNode
+
len
;
char
*
p
=
(
char
*
)
pNode
+
len
;
char
*
pEnd
=
(
char
*
)
pPage
+
pPage
->
num
;
memmove
(
pNode
,
p
,
(
pEnd
-
p
));
...
...
@@ -141,7 +139,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
pBucket
->
size
-=
1
;
}
static
void
doTrimBucketPages
(
SLHashObj
*
pHashObj
,
SLHashBucket
*
pBucket
)
{
static
void
doTrimBucketPages
(
SLHashObj
*
pHashObj
,
SLHashBucket
*
pBucket
)
{
size_t
numOfPages
=
taosArrayGetSize
(
pBucket
->
pPageIdList
);
if
(
numOfPages
<=
1
)
{
return
;
...
...
@@ -188,7 +186,7 @@ static void doTrimBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
}
nodeSize
=
GET_LHASH_NODE_LEN
(
pStart
);
}
else
{
// move to the front of pLast page
}
else
{
// move to the front of pLast page
if
(
pStart
!=
pLast
->
data
)
{
memmove
(
pLast
->
data
,
pStart
,
(((
char
*
)
pLast
)
+
pLast
->
num
-
pStart
));
setBufPageDirty
(
pLast
,
true
);
...
...
@@ -214,7 +212,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
}
memset
(
p
+
POINTER_BYTES
*
pHashObj
->
numOfBuckets
,
0
,
newLen
-
pHashObj
->
numOfBuckets
);
pHashObj
->
pBucket
=
(
SLHashBucket
**
)
p
;
pHashObj
->
pBucket
=
(
SLHashBucket
**
)
p
;
pHashObj
->
numOfAlloc
=
newLen
;
}
...
...
@@ -226,7 +224,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
pageId
=
-
1
;
int32_t
pageId
=
-
1
;
SFilePage
*
p
=
getNewBufPage
(
pHashObj
->
pBuf
,
&
pageId
);
if
(
p
==
NULL
)
{
return
terrno
;
...
...
@@ -239,7 +237,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
taosArrayPush
(
pBucket
->
pPageIdList
,
&
pageId
);
pHashObj
->
numOfBuckets
+=
1
;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -266,14 +264,14 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
setBufPageCompressOnDisk
(
pHashObj
->
pBuf
,
false
);
/**
* The number of bits in the hash value, which is used to decide the exact bucket where the object should be located
in.
* The initial value is 0.
* The number of bits in the hash value, which is used to decide the exact bucket where the object should be located
*
in.
The initial value is 0.
*/
pHashObj
->
bits
=
0
;
pHashObj
->
bits
=
0
;
pHashObj
->
hashFn
=
fn
;
pHashObj
->
tuplesPerPage
=
numOfTuplePerPage
;
pHashObj
->
numOfAlloc
=
4
;
// initial allocated array list
pHashObj
->
numOfAlloc
=
4
;
// initial allocated array list
pHashObj
->
pBucket
=
taosMemoryCalloc
(
pHashObj
->
numOfAlloc
,
POINTER_BYTES
);
code
=
doAddNewBucket
(
pHashObj
);
...
...
@@ -289,7 +287,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
void
*
tHashCleanup
(
SLHashObj
*
pHashObj
)
{
destroyDiskbasedBuf
(
pHashObj
->
pBuf
);
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
numOfBuckets
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
numOfBuckets
;
++
i
)
{
taosArrayDestroy
(
pHashObj
->
pBucket
[
i
]
->
pPageIdList
);
taosMemoryFreeClear
(
pHashObj
->
pBucket
[
i
]);
}
...
...
@@ -299,7 +297,7 @@ void* tHashCleanup(SLHashObj* pHashObj) {
return
NULL
;
}
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
)
{
int32_t
tHashPut
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
,
void
*
data
,
size_t
size
)
{
ASSERT
(
pHashObj
!=
NULL
&&
key
!=
NULL
);
if
(
pHashObj
->
bits
==
0
)
{
...
...
@@ -311,12 +309,12 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
if
(
v
>=
pHashObj
->
numOfBuckets
)
{
int32_t
newBucketId
=
doGetAlternativeBucketId
(
v
,
pHashObj
->
bits
,
pHashObj
->
numOfBuckets
);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v
=
newBucketId
;
}
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
v
];
int32_t
code
=
doAddToBucket
(
pHashObj
,
pBucket
,
v
,
key
,
keyLen
,
data
,
size
);
int32_t
code
=
doAddToBucket
(
pHashObj
,
pBucket
,
v
,
key
,
keyLen
,
data
,
size
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -335,7 +333,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
int32_t
numOfBits
=
ceil
(
log
(
pHashObj
->
numOfBuckets
)
/
log
(
2
));
if
(
numOfBits
>
pHashObj
->
bits
)
{
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT
(
numOfBits
==
pHashObj
->
bits
+
1
);
pHashObj
->
bits
=
numOfBits
;
}
...
...
@@ -344,31 +342,31 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
// load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
splitBucketId
];
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBucket
->
pPageIdList
);
++
i
)
{
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGet
(
pBucket
->
pPageIdList
,
i
);
SFilePage
*
p
=
getBufPage
(
pHashObj
->
pBuf
,
pageId
);
char
*
pStart
=
p
->
data
;
while
(
pStart
-
((
char
*
)
p
)
<
p
->
num
)
{
while
(
pStart
-
((
char
*
)
p
)
<
p
->
num
)
{
SLHashNode
*
pNode
=
(
SLHashNode
*
)
pStart
;
ASSERT
(
pNode
->
keyLen
>
0
&&
pNode
->
dataLen
>=
0
);
char
*
k
=
GET_LHASH_NODE_KEY
(
pNode
);
char
*
k
=
GET_LHASH_NODE_KEY
(
pNode
);
int32_t
hashv
=
pHashObj
->
hashFn
(
k
,
pNode
->
keyLen
);
int32_t
v1
=
doGetBucketIdFromHashVal
(
hashv
,
pHashObj
->
bits
);
if
(
v1
!=
splitBucketId
)
{
// place it into the new bucket
ASSERT
(
v1
==
newBucketId
);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket
*
pNewBucket
=
pHashObj
->
pBucket
[
newBucketId
];
doAddToBucket
(
pHashObj
,
pNewBucket
,
newBucketId
,
(
void
*
)
GET_LHASH_NODE_KEY
(
pNode
),
pNode
->
keyLen
,
GET_LHASH_NODE_KEY
(
pNode
),
pNode
->
dataLen
);
doRemoveFromBucket
(
p
,
pNode
,
pBucket
);
}
else
{
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t
nodeSize
=
GET_LHASH_NODE_LEN
(
pStart
);
pStart
+=
nodeSize
;
...
...
@@ -383,7 +381,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
return
TSDB_CODE_SUCCESS
;
}
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
char
*
tHashGet
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
ASSERT
(
pHashObj
!=
NULL
&&
key
!=
NULL
&&
keyLen
>
0
);
int32_t
hashv
=
pHashObj
->
hashFn
(
key
,
keyLen
);
...
...
@@ -393,7 +391,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
}
SLHashBucket
*
pBucket
=
pHashObj
->
pBucket
[
bucketId
];
int32_t
num
=
taosArrayGetSize
(
pBucket
->
pPageIdList
);
int32_t
num
=
taosArrayGetSize
(
pBucket
->
pPageIdList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
pageId
=
*
(
int32_t
*
)
taosArrayGet
(
pBucket
->
pPageIdList
,
i
);
...
...
@@ -418,7 +416,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
return
NULL
;
}
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
int32_t
tHashRemove
(
SLHashObj
*
pHashObj
,
const
void
*
key
,
size_t
keyLen
)
{
// todo
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -431,8 +429,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
if
(
type
==
LINEAR_HASH_DATA
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
numOfBuckets
;
++
i
)
{
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
}
}
else
{
dBufPrintStatis
(
pHashObj
->
pBuf
);
...
...
source/libs/executor/src/tsort.c
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
source/libs/executor/test/lhashTests.cpp
浏览文件 @
d5255f1e
...
...
@@ -32,41 +32,42 @@ TEST(testCase, linear_hash_Tests) {
int64_t
st
=
taosGetTimestampUs
();
SLHashObj
*
pHashObj
=
tHashInit
(
4098
*
4
*
2
,
512
,
fn
,
40
);
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
SLHashObj
*
pHashObj
=
tHashInit
(
4098
*
4
*
2
,
512
,
fn
,
40
);
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
int32_t
code
=
tHashPut
(
pHashObj
,
&
i
,
sizeof
(
i
),
&
i
,
sizeof
(
i
));
assert
(
code
==
0
);
}
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
int64_t
et
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
if
(
i
==
950000
)
{
printf
(
"kf
\n
"
);
}
char
*
v
=
tHashGet
(
pHashObj
,
&
i
,
sizeof
(
i
));
if
(
v
!=
NULL
)
{
// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
}
else
{
// printf("failed to found key:%d in hash\n", i);
// printf("failed to found key:%d in hash\n", i);
}
}
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
tHashCleanup
(
pHashObj
);
int64_t
et1
=
taosGetTimestampUs
();
SHashObj
*
pHashObj1
=
taosHashInit
(
1000
,
fn
,
false
,
HASH_NO_LOCK
);
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
taosHashPut
(
pHashObj1
,
&
i
,
sizeof
(
i
),
&
i
,
sizeof
(
i
));
}
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
1000000
;
++
i
)
{
void
*
v
=
taosHashGet
(
pHashObj1
,
&
i
,
sizeof
(
i
));
}
taosHashCleanup
(
pHashObj1
);
int64_t
et2
=
taosGetTimestampUs
();
printf
(
"linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f
\n
"
,
(
et1
-
st
)
/
1000.0
,
(
et
-
st
)
/
1000.0
,
(
et2
-
et1
)
/
1000.0
);
printf
(
"linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f
\n
"
,
(
et1
-
st
)
/
1000.0
,
(
et
-
st
)
/
1000.0
,
(
et2
-
et1
)
/
1000.0
);
}
\ No newline at end of file
source/libs/executor/test/sortTests.cpp
浏览文件 @
d5255f1e
...
...
@@ -25,14 +25,14 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "executorimpl.h"
#include "executor.h"
#include "executorimpl.h"
#include "taos.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdef.h"
#include "trpc.h"
#include "tvariant.h"
#include "tcompare.h"
namespace
{
typedef
struct
{
...
...
@@ -44,8 +44,7 @@ typedef struct {
int16_t
VARCOUNT
=
16
;
float
rand_f2
()
{
float
rand_f2
()
{
unsigned
r
=
taosRand
();
r
&=
0x007fffff
;
r
|=
0x40800000
;
...
...
@@ -53,10 +52,10 @@ float rand_f2()
}
static
const
int32_t
TEST_NUMBER
=
1
;
#define bigendian()
((*(char
*)&TEST_NUMBER) == 0)
#define bigendian()
((*(char
*)&TEST_NUMBER) == 0)
SSDataBlock
*
getSingleColDummyBlock
(
void
*
param
)
{
_info
*
pInfo
=
(
_info
*
)
param
;
_info
*
pInfo
=
(
_info
*
)
param
;
if
(
--
pInfo
->
count
<
0
)
{
return
NULL
;
}
...
...
@@ -65,11 +64,11 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
type
=
pInfo
->
type
;
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_NCHAR
){
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
colInfo
.
info
.
bytes
=
TSDB_NCHAR_SIZE
*
VARCOUNT
+
VARSTR_HEADER_SIZE
;
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
colInfo
.
info
.
bytes
=
VARCOUNT
+
VARSTR_HEADER_SIZE
;
}
else
{
}
else
{
colInfo
.
info
.
bytes
=
tDataTypes
[
pInfo
->
type
].
bytes
;
}
colInfo
.
info
.
colId
=
1
;
...
...
@@ -80,39 +79,39 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
for
(
int32_t
i
=
0
;
i
<
pInfo
->
pageRows
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
static_cast
<
SColumnInfoData
*>
(
TARRAY_GET_ELEM
(
pBlock
->
pDataBlock
,
0
));
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_NCHAR
){
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
int32_t
size
=
taosRand
()
%
VARCOUNT
;
char
str
[
128
]
=
{
0
};
char
strOri
[
128
]
=
{
0
};
char
str
[
128
]
=
{
0
};
char
strOri
[
128
]
=
{
0
};
taosRandStr
(
strOri
,
size
);
int32_t
len
=
0
;
bool
ret
=
taosMbsToUcs4
(
strOri
,
size
,
(
TdUcs4
*
)
varDataVal
(
str
),
size
*
TSDB_NCHAR_SIZE
,
&
len
);
if
(
!
ret
){
bool
ret
=
taosMbsToUcs4
(
strOri
,
size
,
(
TdUcs4
*
)
varDataVal
(
str
),
size
*
TSDB_NCHAR_SIZE
,
&
len
);
if
(
!
ret
)
{
printf
(
"error
\n
"
);
return
NULL
;
}
varDataSetLen
(
str
,
len
);
colDataAppend
(
pColInfo
,
i
,
reinterpret_cast
<
const
char
*>
(
str
),
false
);
pBlock
->
info
.
hasVarCol
=
true
;
printf
(
"nchar: %s
\n
"
,
strOri
);
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
printf
(
"nchar: %s
\n
"
,
strOri
);
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
int32_t
size
=
taosRand
()
%
VARCOUNT
;
char
str
[
64
]
=
{
0
};
char
str
[
64
]
=
{
0
};
taosRandStr
(
varDataVal
(
str
),
size
);
varDataSetLen
(
str
,
size
);
colDataAppend
(
pColInfo
,
i
,
reinterpret_cast
<
const
char
*>
(
str
),
false
);
pBlock
->
info
.
hasVarCol
=
true
;
printf
(
"binary: %s
\n
"
,
varDataVal
(
str
));
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_DOUBLE
||
pInfo
->
type
==
TSDB_DATA_TYPE_FLOAT
)
{
}
else
if
(
pInfo
->
type
==
TSDB_DATA_TYPE_DOUBLE
||
pInfo
->
type
==
TSDB_DATA_TYPE_FLOAT
)
{
double
v
=
rand_f2
();
colDataAppend
(
pColInfo
,
i
,
reinterpret_cast
<
const
char
*>
(
&
v
),
false
);
printf
(
"float: %f
\n
"
,
v
);
}
else
{
}
else
{
int64_t
v
=
++
pInfo
->
startVal
;
char
*
result
=
static_cast
<
char
*>
(
taosMemoryCalloc
(
tDataTypes
[
pInfo
->
type
].
bytes
,
1
));
if
(
!
bigendian
()){
char
*
result
=
static_cast
<
char
*>
(
taosMemoryCalloc
(
tDataTypes
[
pInfo
->
type
].
bytes
,
1
));
if
(
!
bigendian
())
{
memcpy
(
result
,
&
v
,
tDataTypes
[
pInfo
->
type
].
bytes
);
}
else
{
}
else
{
memcpy
(
result
,
(
char
*
)(
&
v
)
+
sizeof
(
int64_t
)
-
tDataTypes
[
pInfo
->
type
].
bytes
,
tDataTypes
[
pInfo
->
type
].
bytes
);
}
...
...
@@ -126,17 +125,16 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
return
pBlock
;
}
int32_t
docomp
(
const
void
*
p1
,
const
void
*
p2
,
void
*
param
)
{
int32_t
pLeftIdx
=
*
(
int32_t
*
)
p1
;
int32_t
pRightIdx
=
*
(
int32_t
*
)
p2
;
int32_t
pLeftIdx
=
*
(
int32_t
*
)
p1
;
int32_t
pRightIdx
=
*
(
int32_t
*
)
p2
;
SMsortComparParam
*
pParam
=
(
SMsortComparParam
*
)
param
;
SSortSource
**
px
=
reinterpret_cast
<
SSortSource
**>
(
pParam
->
pSources
);
SMsortComparParam
*
pParam
=
(
SMsortComparParam
*
)
param
;
SSortSource
**
px
=
reinterpret_cast
<
SSortSource
**>
(
pParam
->
pSources
);
SArray
*
pInfo
=
pParam
->
orderInfo
;
SArray
*
pInfo
=
pParam
->
orderInfo
;
SSortSource
*
pLeftSource
=
px
[
pLeftIdx
];
SSortSource
*
pLeftSource
=
px
[
pLeftIdx
];
SSortSource
*
pRightSource
=
px
[
pRightIdx
];
// this input is exhausted, set the special value to denote this
...
...
@@ -151,36 +149,38 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
SSDataBlock
*
pLeftBlock
=
pLeftSource
->
src
.
pBlock
;
SSDataBlock
*
pRightBlock
=
pRightSource
->
src
.
pBlock
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
size
;
++
i
)
{
SBlockOrderInfo
*
pOrder
=
(
SBlockOrderInfo
*
)
TARRAY_GET_ELEM
(
pInfo
,
i
);
SColumnInfoData
*
pLeftColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pLeftBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
leftNull
=
false
;
bool
leftNull
=
false
;
if
(
pLeftColInfoData
->
hasNull
)
{
leftNull
=
colDataIsNull
(
pLeftColInfoData
,
pLeftBlock
->
info
.
rows
,
pLeftSource
->
src
.
rowIndex
,
pLeftBlock
->
pBlockAgg
[
pOrder
->
slotId
]);
leftNull
=
colDataIsNull
(
pLeftColInfoData
,
pLeftBlock
->
info
.
rows
,
pLeftSource
->
src
.
rowIndex
,
pLeftBlock
->
pBlockAgg
[
pOrder
->
slotId
]);
}
SColumnInfoData
*
pRightColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
rightNull
=
false
;
SColumnInfoData
*
pRightColInfoData
=
(
SColumnInfoData
*
)
TARRAY_GET_ELEM
(
pRightBlock
->
pDataBlock
,
pOrder
->
slotId
);
bool
rightNull
=
false
;
if
(
pRightColInfoData
->
hasNull
)
{
rightNull
=
colDataIsNull
(
pRightColInfoData
,
pRightBlock
->
info
.
rows
,
pRightSource
->
src
.
rowIndex
,
pRightBlock
->
pBlockAgg
[
pOrder
->
slotId
]);
rightNull
=
colDataIsNull
(
pRightColInfoData
,
pRightBlock
->
info
.
rows
,
pRightSource
->
src
.
rowIndex
,
pRightBlock
->
pBlockAgg
[
pOrder
->
slotId
]);
}
if
(
leftNull
&&
rightNull
)
{
continue
;
// continue to next slot
continue
;
// continue to next slot
}
if
(
rightNull
)
{
return
pOrder
->
nullFirst
?
1
:
-
1
;
return
pOrder
->
nullFirst
?
1
:
-
1
;
}
if
(
leftNull
)
{
return
pOrder
->
nullFirst
?
-
1
:
1
;
return
pOrder
->
nullFirst
?
-
1
:
1
;
}
void
*
left1
=
colDataGetData
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGetData
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
void
*
left1
=
colDataGetData
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGetData
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
__compar_fn_t
fn
=
getKeyComparFunc
(
pLeftColInfoData
->
info
.
type
,
pOrder
->
order
);
int
ret
=
fn
(
left1
,
right1
);
...
...
source/libs/executor/test/tSimpleHashTests.cpp
浏览文件 @
d5255f1e
...
...
@@ -31,8 +31,7 @@
// }
TEST
(
testCase
,
tSimpleHashTest_intKey
)
{
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
));
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
));
assert
(
pHashObj
!=
nullptr
);
...
...
@@ -76,10 +75,8 @@ TEST(testCase, tSimpleHashTest_intKey) {
tSimpleHashCleanup
(
pHashObj
);
}
TEST
(
testCase
,
tSimpleHashTest_binaryKey
)
{
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
));
SSHashObj
*
pHashObj
=
tSimpleHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
));
assert
(
pHashObj
!=
nullptr
);
...
...
@@ -93,7 +90,7 @@ TEST(testCase, tSimpleHashTest_binaryKey) {
size_t
keyLen
=
sizeof
(
SCombineKey
);
size_t
dataLen
=
sizeof
(
int64_t
);
int64_t
originDataSum
=
0
;
int64_t
originDataSum
=
0
;
SCombineKey
combineKey
=
{
0
};
for
(
int64_t
i
=
1
;
i
<=
100
;
++
i
)
{
combineKey
.
suid
=
i
;
...
...
@@ -140,5 +137,4 @@ TEST(testCase, tSimpleHashTest_binaryKey) {
tSimpleHashCleanup
(
pHashObj
);
}
#pragma GCC diagnostic pop
\ No newline at end of file
source/libs/tfs/inc/tfsInt.h
浏览文件 @
d5255f1e
...
...
@@ -26,12 +26,14 @@
#include "tlog.h"
// For debug purpose
// clang-format off
#define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define fError(...) { if (fsDebugFlag & DEBUG_ERROR) { taosPrintLog("TFS ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define fWarn(...) { if (fsDebugFlag & DEBUG_WARN) { taosPrintLog("TFS WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define fInfo(...) { if (fsDebugFlag & DEBUG_INFO) { taosPrintLog("TFS ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", DEBUG_DEBUG, fsDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", DEBUG_TRACE, fsDebugFlag, __VA_ARGS__); }}
// clang-format on
typedef
struct
{
int32_t
level
;
...
...
@@ -42,12 +44,12 @@ typedef struct {
typedef
struct
{
TdThreadSpinlock
lock
;
int32_t
level
;
int32_t
nextid
;
// next disk id to allocate
int32_t
ndisk
;
// # of disks mounted to this tier
int32_t
nAvailDisks
;
// # of Available disks
STfsDisk
*
disks
[
TFS_MAX_DISKS_PER_TIER
];
SDiskSize
size
;
int32_t
level
;
int32_t
nextid
;
// next disk id to allocate
int32_t
ndisk
;
// # of disks mounted to this tier
int32_t
nAvailDisks
;
// # of Available disks
STfsDisk
*
disks
[
TFS_MAX_DISKS_PER_TIER
];
SDiskSize
size
;
}
STfsTier
;
typedef
struct
{
...
...
@@ -65,10 +67,10 @@ typedef struct STfsDir {
typedef
struct
STfs
{
TdThreadSpinlock
lock
;
SDiskSize
size
;
int32_t
nlevel
;
STfsTier
tiers
[
TFS_MAX_TIERS
];
SHashObj
*
hash
;
// name to did map
SDiskSize
size
;
int32_t
nlevel
;
STfsTier
tiers
[
TFS_MAX_TIERS
];
SHashObj
*
hash
;
// name to did map
}
STfs
;
STfsDisk
*
tfsNewDisk
(
int32_t
level
,
int32_t
id
,
const
char
*
dir
);
...
...
@@ -82,15 +84,15 @@ void tfsUpdateTierSize(STfsTier *pTier);
int32_t
tfsAllocDiskOnTier
(
STfsTier
*
pTier
);
void
tfsPosNextId
(
STfsTier
*
pTier
);
#define tfsLockTier(pTier) taosThreadSpinLock(&(pTier)->lock)
#define tfsLockTier(pTier)
taosThreadSpinLock(&(pTier)->lock)
#define tfsUnLockTier(pTier) taosThreadSpinUnlock(&(pTier)->lock)
#define tfsLock(pTfs) taosThreadSpinLock(&(pTfs)->lock)
#define tfsLock(pTfs)
taosThreadSpinLock(&(pTfs)->lock)
#define tfsUnLock(pTfs) taosThreadSpinUnlock(&(pTfs)->lock)
#define TFS_TIER_AT(pTfs, level) (&(pTfs)->tiers[level])
#define TFS_DISK_AT(pTfs, did) ((pTfs)->tiers[(did).level].disks[(did).id])
#define TFS_PRIMARY_DISK(pTfs) ((pTfs)->tiers[0].disks[0])
#define TFS_DISK_AT(pTfs, did)
((pTfs)->tiers[(did).level].disks[(did).id])
#define TFS_PRIMARY_DISK(pTfs)
((pTfs)->tiers[0].disks[0])
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
...
...
source/libs/tfs/src/tfs.c
浏览文件 @
d5255f1e
...
...
@@ -213,7 +213,7 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
void
tfsAbsoluteName
(
STfs
*
pTfs
,
SDiskID
diskId
,
const
char
*
rname
,
char
*
aname
)
{
STfsDisk
*
pDisk
=
TFS_DISK_AT
(
pTfs
,
diskId
);
snprintf
(
aname
,
TSDB_FILENAME_LEN
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
rname
);
}
...
...
@@ -285,7 +285,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
int32_t
tfsRmdir
(
STfs
*
pTfs
,
const
char
*
rname
)
{
ASSERT
(
rname
[
0
]
!=
0
);
char
aname
[
TMPNAME_LEN
]
=
"
\0
"
;
for
(
int32_t
level
=
0
;
level
<
pTfs
->
nlevel
;
level
++
)
{
...
...
source/libs/tfs/test/tfsTest.cpp
浏览文件 @
d5255f1e
...
...
@@ -16,7 +16,7 @@
class
TfsTest
:
public
::
testing
::
Test
{
protected:
#ifdef _TD_DARWIN_64
#ifdef _TD_DARWIN_64
static
void
SetUpTestSuite
()
{
root
=
"/private"
TD_TMP_DIR_PATH
"tfsTest"
;
}
#else
static
void
SetUpTestSuite
()
{
root
=
TD_TMP_DIR_PATH
"tfsTest"
;
}
...
...
@@ -303,7 +303,7 @@ TEST_F(TfsTest, 04_File) {
TEST_F
(
TfsTest
,
05
_MultiDisk
)
{
int32_t
code
=
0
;
#ifdef _TD_DARWIN_64
#ifdef _TD_DARWIN_64
const
char
*
root00
=
"/private"
TD_TMP_DIR_PATH
"tfsTest00"
;
const
char
*
root01
=
"/private"
TD_TMP_DIR_PATH
"tfsTest01"
;
const
char
*
root10
=
"/private"
TD_TMP_DIR_PATH
"tfsTest10"
;
...
...
tools/scripts/codeFormat.sh
浏览文件 @
d5255f1e
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录