Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c1605db6
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c1605db6
编写于
12月 26, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
0d11825d
9fe46510
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
1676 addition
and
391 deletion
+1676
-391
CMakeLists.txt
CMakeLists.txt
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-2
include/libs/index/index.h
include/libs/index/index.h
+12
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+10
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+27
-16
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+526
-32
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+2
-0
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+28
-31
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+16
-9
source/libs/index/inc/index_fst_automation.h
source/libs/index/inc/index_fst_automation.h
+1
-1
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+4
-0
source/libs/index/src/index.c
source/libs/index/src/index.c
+46
-34
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+74
-64
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+8
-6
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+25
-1
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+14
-12
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+22
-4
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+174
-0
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+23
-5
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+17
-16
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+2
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+5
-0
source/libs/qworker/test/CMakeLists.txt
source/libs/qworker/test/CMakeLists.txt
+18
-0
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+120
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+20
-9
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+183
-89
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+295
-57
未找到文件。
CMakeLists.txt
浏览文件 @
c1605db6
...
...
@@ -11,6 +11,7 @@ set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include
(
${
CMAKE_SUPPORT_DIR
}
/cmake.options
)
SET
(
CMAKE_C_FLAGS
"
${
CMAKE_C_FLAGS
}
-fPIC -gdwarf-2 -msse4.2 -mfma -g3"
)
SET
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS
}
-fPIC -gdwarf-2 -msse4.2 -mfma -g3"
)
# contrib
add_subdirectory
(
contrib
)
...
...
include/libs/catalog/catalog.h
浏览文件 @
c1605db6
...
...
@@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
);
/**
* Get a table's vgroup from its name's hash value.
...
...
@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
EpSet
*
pQnodeEpSe
t
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
Array
*
pQnodeLis
t
);
...
...
include/libs/index/index.h
浏览文件 @
c1605db6
...
...
@@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid,
int32_t
nColVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
/*
* init index
*
*/
int32_t
indexInit
();
/*
* destory index
*
*/
void
indexCleanUp
();
#ifdef __cplusplus
}
#endif
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
c1605db6
...
...
@@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
uint64_t
*
numOfRows
);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
);
...
...
@@ -79,4 +87,4 @@ void schedulerDestroy(void);
}
#endif
#endif
/*_TD_SCHEDULER_H_*/
\ No newline at end of file
#endif
/*_TD_SCHEDULER_H_*/
source/client/src/clientImpl.c
浏览文件 @
c1605db6
...
...
@@ -209,7 +209,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
return
scheduleExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
);
return
schedule
Async
ExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
);
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
c1605db6
...
...
@@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
vgroupList
)
{
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
*
vgroupList
)
{
SHashObj
*
vgroupHash
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
*
vgroupList
=
taosArrayInit
(
taosHashGetSize
(
dbInfo
->
vgInfo
),
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
vgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter
)
{
vgInfo
=
pIter
;
if
(
NULL
==
taosArrayPush
(
vgroupList
,
vgInfo
))
{
if
(
NULL
==
taosArrayPush
(
*
vgroupList
,
vgInfo
))
{
ctgError
(
"taosArrayPush failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
...
...
@@ -295,14 +301,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
...
...
@@ -329,7 +327,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
}
}
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
tbFname
,
strlen
(
output
->
tbFname
),
output
->
tbMeta
,
sizeof
(
*
output
->
tbMeta
))
!=
0
)
{
int32_t
tbSize
=
sizeof
(
*
output
->
tbMeta
)
+
sizeof
(
SSchema
)
*
(
output
->
tbMeta
->
tableInfo
.
numOfColumns
+
output
->
tbMeta
->
tableInfo
.
numOfTags
);
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
tbFname
,
strlen
(
output
->
tbFname
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
ctgError
(
"push table[%s] to table cache failed"
,
output
->
tbFname
);
goto
error_exit
;
}
...
...
@@ -529,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
return
ctgGetTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
true
,
pTableMeta
);
}
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -549,17 +548,29 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
*
pVgroupList
=
taosArrayInit
(
1
,
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
pVgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
pVgroupList
,
&
vgroupInfo
))
{
if
(
NULL
==
taosArrayPush
(
*
pVgroupList
,
&
vgroupInfo
))
{
ctgError
(
"push vgroupInfo to array failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
tfree
(
tbMeta
);
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
tbMeta
);
taosArrayDestroy
(
*
pVgroupList
);
CTG_RET
(
code
);
}
...
...
@@ -634,8 +645,8 @@ _return:
CTG_RET
(
code
);
}
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
EpSet
*
pQnodeEpSe
t
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pQnode
EpSe
t
)
{
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
Array
*
pQnodeLis
t
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pQnode
Lis
t
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
c1605db6
...
...
@@ -32,27 +32,28 @@
#include "stub.h"
#include "addr_any.h"
typedef
struct
SAppInstInfo
{
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
}
SAppInstInfo
;
typedef
struct
STscObj
{
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
acctId
[
TSDB_ACCT_ID_LEN
];
char
db
[
TSDB_ACCT_ID_LEN
+
TSDB_DB_NAME_LEN
];
uint32_t
connId
;
uint64_t
id
;
// ref ID returned by taosAddRef
// struct SSqlObj *sqlList;
void
*
pTransporter
;
pthread_mutex_t
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj from this tscObj
SAppInstInfo
*
pAppInfo
;
}
STscObj
;
namespace
{
void
ctgTestSetPrepareTableMeta
();
void
ctgTestSetPrepareCTableMeta
();
void
ctgTestSetPrepareSTableMeta
();
int32_t
ctgTestVgNum
=
10
;
int32_t
ctgTestColNum
=
2
;
int32_t
ctgTestTagNum
=
1
;
int32_t
ctgTestSVersion
=
1
;
int32_t
ctgTestTVersion
=
1
;
char
*
ctgTestClusterId
=
"cluster1"
;
char
*
ctgTestDbname
=
"1.db1"
;
char
*
ctgTestTablename
=
"table1"
;
char
*
ctgTestCTablename
=
"ctable1"
;
char
*
ctgTestSTablename
=
"stable1"
;
void
sendCreateDbMsg
(
void
*
shandle
,
SEpSet
*
pEpSet
)
{
SCreateDbMsg
*
pReq
=
(
SCreateDbMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateDbMsg
));
strcpy
(
pReq
->
db
,
"1.db1"
);
...
...
@@ -88,22 +89,281 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ
(
rpcRsp
.
code
,
0
);
}
void
__rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
void
ctgTestPrepareDbVgroups
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
SUseDbRsp
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
SUseDbRsp
)
+
ctgTestVgNum
*
sizeof
(
SVgroupInfo
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
SUseDbRsp
*
)
pRsp
->
pCont
;
strcpy
(
rspMsg
->
db
,
ctgTestDbname
);
rspMsg
->
vgVersion
=
htonl
(
1
);
rspMsg
->
vgNum
=
htonl
(
ctgTestVgNum
);
rspMsg
->
hashMethod
=
0
;
SVgroupInfo
*
vg
=
NULL
;
uint32_t
hashUnit
=
UINT32_MAX
/
ctgTestVgNum
;
for
(
int32_t
i
=
0
;
i
<
ctgTestVgNum
;
++
i
)
{
vg
=
&
rspMsg
->
vgroupInfo
[
i
];
vg
->
vgId
=
htonl
(
i
+
1
);
vg
->
hashBegin
=
htonl
(
i
*
hashUnit
);
vg
->
hashEnd
=
htonl
(
hashUnit
*
(
i
+
1
)
-
1
);
vg
->
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vg
->
inUse
=
i
%
vg
->
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vg
->
numOfEps
;
++
n
)
{
SEpAddrMsg
*
addr
=
&
vg
->
epAddr
[
n
];
strcpy
(
addr
->
fqdn
,
"a0"
);
addr
->
port
=
htons
(
n
+
22
);
}
}
vg
->
hashEnd
=
htonl
(
UINT32_MAX
);
return
;
}
void
ctgTestPrepareTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestTablename
);
rspMsg
->
numOfTags
=
0
;
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_NORMAL_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
0
;
rspMsg
->
tuid
=
htobe64
(
0x0000000000000001
);
rspMsg
->
vgId
=
htonl
(
8
);
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1"
);
return
;
}
void
ctgTestPrepareCTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestCTablename
);
sprintf
(
rspMsg
->
stbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
rspMsg
->
numOfTags
=
htonl
(
ctgTestTagNum
);
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_CHILD_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
htobe64
(
0x0000000000000002
);
rspMsg
->
tuid
=
htobe64
(
0x0000000000000003
);
rspMsg
->
vgId
=
htonl
(
9
);
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
return
;
}
void
ctgTestPrepareSTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
sprintf
(
rspMsg
->
stbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
rspMsg
->
numOfTags
=
htonl
(
ctgTestTagNum
);
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_SUPER_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
htobe64
(
0x0000000000000002
);
rspMsg
->
tuid
=
htobe64
(
0x0000000000000003
);
rspMsg
->
vgId
=
0
;
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
return
;
}
void
ctgTestPrepareDbVgroupsAndNormalMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndChildMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareCTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndSuperMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareSTableMeta
();
return
;
}
void
ctgTestSetPrepareDbVgroups
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroups
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroups
);
}
}
}
void
ctgTestSetPrepareTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareTableMeta
);
}
}
}
void
ctgTestSetPrepareCTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareCTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareCTableMeta
);
}
}
}
void
ctgTestSetPrepareSTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareSTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareSTableMeta
);
}
}
}
void
ctgTestSetPrepareDbVgroupsAndNormalMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
}
}
}
void
ctgTestSetPrepareDbVgroupsAndChildMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndChildMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndChildMeta
);
}
}
}
void
initTestEnv
()
{
void
ctgTestSetPrepareDbVgroupsAndSuperMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
__rpcSendRecv
);
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndSuperMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
__rpcSendRecv
);
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndSuperMeta
);
}
}
}
...
...
@@ -111,33 +371,267 @@ void initTestEnv() {
}
TEST
(
testCase
,
normalCase
)
{
STscObj
*
pConn
=
(
STscObj
*
)
taos_connect
(
"127.0.0.1"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TEST
(
tableMeta
,
normalTable
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroups
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
numOfEps
,
3
);
ctgTestSetPrepareTableMeta
();
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
0
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
0
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
catalogDestroy
();
}
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
TEST
(
tableMeta
,
childTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
sendCreateDbMsg
(
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
);
//
sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
,
dbname
,
tablename
,
&
vgInfo
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
taos_close
(
pConn
);
catalogDestroy
(
);
}
TEST
(
tableMeta
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroupsAndSuperMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
ctgTestSetPrepareCTableMeta
();
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogRenewAndGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
normalTable
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndNormalMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
8
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
childTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
9
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
4
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndSuperMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
10
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
1
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
1
);
ASSERT_EQ
(
vgInfo
->
vgId
,
2
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
2
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
2
);
ASSERT_EQ
(
vgInfo
->
vgId
,
3
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
...
...
source/libs/index/CMakeLists.txt
浏览文件 @
c1605db6
...
...
@@ -3,7 +3,9 @@ add_library(index ${INDEX_SRC})
target_include_directories
(
index
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
index
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
c1605db6
...
...
@@ -49,7 +49,6 @@ struct SIndex {
SHashObj
*
colObj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int
colId
;
// field id allocated to cache
int32_t
cVersion
;
// current version allocated to cache
SIndexStat
stat
;
...
...
@@ -88,41 +87,39 @@ typedef struct SIndexTermQuery {
EIndexQueryType
qType
;
}
SIndexTermQuery
;
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
typedef
struct
Iterate
{
void
*
iter
;
int8_t
type
;
char
*
colVal
;
SArray
*
val
;
}
Iterate
;
extern
void
*
indexQhandle
;
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \
} while (0)
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \
} while (0)
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \
} while (0)
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
} while (0)
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
} while (0)
#ifdef __cplusplus
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
c1605db6
...
...
@@ -22,10 +22,8 @@
// ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->|
* |<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|
* <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->|
* content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/
#ifdef __cplusplus
...
...
@@ -34,12 +32,17 @@ extern "C" {
typedef
struct
IndexCache
{
T_REF_DECLARE
()
SSkipList
*
skiplist
;
SSkipList
*
mem
,
*
imm
;
SIndex
*
index
;
char
*
colName
;
int32_t
version
;
int32_t
nTerm
;
int8_t
type
;
}
IndexCache
;
typedef
struct
CacheTerm
{
// key
int32_t
colId
;
int32_t
nColVal
;
char
*
colVal
;
int32_t
version
;
...
...
@@ -49,14 +52,18 @@ typedef struct CacheTerm {
SIndexOperOnColumn
operaType
;
}
CacheTerm
;
//
IndexCache
*
indexCacheCreate
();
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
);
void
indexCacheDestroy
(
void
*
cache
);
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
);
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
);
// int indexCacheGet(void *cache, uint64_t *rst);
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
);
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
);
void
indexCacheRef
(
IndexCache
*
cache
);
void
indexCacheUnRef
(
IndexCache
*
cache
);
void
indexCacheDebug
(
IndexCache
*
cache
);
#ifdef __cplusplus
...
...
source/libs/index/inc/index_fst_automation.h
浏览文件 @
c1605db6
...
...
@@ -23,7 +23,7 @@ extern "C" {
typedef
struct
AutomationCtx
AutomationCtx
;
typedef
enum
AutomationType
{
AUTOMATION_PREFIX
,
AUTMMATION_MATCH
}
AutomationType
;
typedef
enum
AutomationType
{
AUTOMATION_
ALWAYS
,
AUTOMATION_
PREFIX
,
AUTMMATION_MATCH
}
AutomationType
;
typedef
struct
StartWith
{
AutomationCtx
*
autoSelf
;
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
c1605db6
...
...
@@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
,
TFileReader
*
reader
);
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
);
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
);
void
tfileReaderDestroy
(
TFileReader
*
reader
);
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
);
void
tfileReaderRef
(
TFileReader
*
reader
);
void
tfileReaderUnRef
(
TFileReader
*
reader
);
TFileWriter
*
tfileWriterCreate
(
WriterCtx
*
ctx
,
TFileHeader
*
header
);
void
tfileWriterDestroy
(
TFileWriter
*
tw
);
...
...
source/libs/index/src/index.c
浏览文件 @
c1605db6
...
...
@@ -18,11 +18,26 @@
#include "index_cache.h"
#include "index_tfile.h"
#include "tdef.h"
#include "tsched.h"
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
#endif
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4
void
*
indexQhandle
=
NULL
;
int32_t
indexInit
()
{
indexQhandle
=
taosInitScheduler
(
INDEX_QUEUE_SIZE
,
INDEX_NUM_OF_THREADS
,
"index"
);
return
indexQhandle
==
NULL
?
-
1
:
0
;
// do nothing
}
void
indexCleanUp
()
{
taosCleanUpScheduler
(
indexQhandle
);
}
static
int
uidCompare
(
const
void
*
a
,
const
void
*
b
)
{
uint64_t
u1
=
*
(
uint64_t
*
)
a
;
uint64_t
u2
=
*
(
uint64_t
*
)
b
;
...
...
@@ -38,16 +53,15 @@ typedef struct SIdxColInfo {
}
SIdxColInfo
;
static
pthread_once_t
isInit
=
PTHREAD_ONCE_INIT
;
static
void
indexInit
();
//
static void indexInit();
static
int
indexTermSearch
(
SIndex
*
sIdx
,
SIndexTermQuery
*
term
,
SArray
**
result
);
static
int
indexFlushCacheTFile
(
SIndex
*
sIdx
);
static
void
indexInterResultsDestroy
(
SArray
*
results
);
static
int
indexMergeFinalResults
(
SArray
*
interResults
,
EIndexOperatorType
oType
,
SArray
*
finalResult
);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
//
pthread_once(&isInit, indexInit);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
...
...
@@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif
#ifdef USE_INVERTED_INDEX
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
(
);
sIdx
->
tindex
=
NULL
;
// sIdx->cache = (void*)indexCacheCreate(sIdx
);
sIdx
->
tindex
=
indexTFileCreate
(
path
)
;
sIdx
->
colObj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
colId
=
1
;
sIdx
->
cVersion
=
1
;
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
...
...
@@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) {
#ifdef USE_INVERTED_INDEX
indexCacheDestroy
(
sIdx
->
cache
);
void
*
iter
=
taosHashIterate
(
sIdx
->
colObj
,
NULL
);
while
(
iter
)
{
IndexCache
**
pCache
=
iter
;
if
(
*
pCache
)
{
indexCacheUnRef
(
*
pCache
);
}
iter
=
taosHashIterate
(
sIdx
->
colObj
,
iter
);
}
taosHashCleanup
(
sIdx
->
colObj
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
#endif
...
...
@@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
fi
==
NULL
)
{
SIdxColInfo
tfi
=
{.
colId
=
index
->
colId
};
index
->
cVersion
++
;
index
->
colId
++
;
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
tfi
,
sizeof
(
tfi
));
}
else
{
// TODO, del
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
*
cache
==
NULL
)
{
IndexCache
*
pCache
=
indexCacheCreate
(
index
,
p
->
colName
,
p
->
colType
);
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
pCache
,
sizeof
(
void
*
));
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdxColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
fi
!=
NULL
);
int32_t
colId
=
fi
->
colId
;
int32_t
version
=
index
->
cVersion
;
int
ret
=
indexCachePut
(
index
->
cache
,
p
,
colId
,
version
,
uid
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
*
cache
!=
NULL
);
int
ret
=
indexCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
return
ret
;
}
}
#endif
#endif
return
0
;
}
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
...
...
@@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) {
taosArrayDestroy
(
terms
);
}
void
indexInit
()
{
// do nothing
}
static
int
indexTermSearch
(
SIndex
*
sIdx
,
SIndexTermQuery
*
query
,
SArray
**
result
)
{
int32_t
version
=
-
1
;
int16_t
colId
=
-
1
;
SIdxColInfo
*
colInfo
=
NULL
;
SIndexTerm
*
term
=
query
->
term
;
const
char
*
colName
=
term
->
colName
;
int32_t
nColName
=
term
->
nColName
;
// Get col info
IndexCache
*
cache
=
NULL
;
pthread_mutex_lock
(
&
sIdx
->
mtx
);
colInfo
=
taosHashGet
(
sIdx
->
colObj
,
colName
,
nColName
);
if
(
colInfo
==
NULL
)
{
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
colName
,
nColName
);
if
(
*
pCache
==
NULL
)
{
pthread_mutex_unlock
(
&
sIdx
->
mtx
);
return
-
1
;
}
colId
=
colInfo
->
colId
;
version
=
colInfo
->
cVersion
;
cache
=
*
pCache
;
pthread_mutex_unlock
(
&
sIdx
->
mtx
);
*
result
=
taosArrayInit
(
4
,
sizeof
(
uint64_t
));
// TODO: iterator mem and tidex
STermValueType
s
;
if
(
0
==
indexCacheSearch
(
sIdx
->
cache
,
query
,
colId
,
version
,
*
result
,
&
s
))
{
if
(
0
==
indexCacheSearch
(
cache
,
query
,
*
result
,
&
s
))
{
if
(
s
==
kTypeDeletion
)
{
indexInfo
(
"col: %s already drop by other opera"
,
term
->
colName
);
// coloum already drop by other oper, no need to query tindex
...
...
@@ -353,10 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
return
0
;
}
static
int
indexFlushCacheTFile
(
SIndex
*
sIdx
)
{
int
indexFlushCacheTFile
(
SIndex
*
sIdx
,
void
*
cache
)
{
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
IndexCache
*
pCache
=
(
IndexCache
*
)
cache
;
TFileReader
*
pReader
=
tfileGetReaderByCol
(
sIdx
->
tindex
,
pCache
->
colName
);
tfileReaderUnRef
(
pReader
);
indexCacheUnRef
(
pCache
);
return
0
;
}
source/libs/index/src/index_cache.c
浏览文件 @
c1605db6
...
...
@@ -16,9 +16,11 @@
#include "index_cache.h"
#include "index_util.h"
#include "tcompare.h"
#include "tsched.h"
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define CACH_LIMIT 1000000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
...
...
@@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) {
CacheTerm
*
lt
=
(
CacheTerm
*
)
l
;
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
// compare colId
if
(
lt
->
colId
!=
rt
->
colId
)
{
return
lt
->
colId
-
rt
->
colId
;
}
// compare colVal
int
i
,
j
;
for
(
i
=
0
,
j
=
0
;
i
<
lt
->
nColVal
&&
j
<
rt
->
nColVal
;
i
++
,
j
++
)
{
...
...
@@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) {
return
-
1
;
}
// compare version
return
rt
->
version
-
lt
->
version
;
}
// char* lp = (char*)l;
// char* rp = (char*)r;
//// compare col id
// int16_t lf, rf; // cold id
// memcpy(&lf, lp, sizeof(lf));
// memcpy(&rf, rp, sizeof(rf));
// if (lf != rf) { return lf < rf ? -1 : 1; }
// lp += sizeof(lf);
// rp += sizeof(rf);
//// skip value len
// int32_t lfl, rfl;
// memcpy(&lfl, lp, sizeof(lfl));
// memcpy(&rfl, rp, sizeof(rfl));
// lp += sizeof(lfl);
// rp += sizeof(rfl);
//// compare value
// int32_t i, j;
// for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
// if (lp[i] == rp[j]) {
// continue;
// } else {
// return lp[i] < rp[j] ? -1 : 1;
// }
//}
// if (i < lfl) {
// return 1;
//} else if (j < rfl) {
// return -1;
//}
// lp += lfl;
// rp += rfl;
//// compare version, desc order
// int32_t lv, rv;
// memcpy(&lv, lp, sizeof(lv));
// memcpy(&rv, rp, sizeof(rv));
// if (lv != rv) { return lv < rv ? 1 : -1; }
// return 0;
static
SSkipList
*
indexInternalCacheCreate
(
int8_t
type
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
)
{
return
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
type
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
}
}
IndexCache
*
indexCacheCreate
()
{
IndexCache
*
indexCacheCreate
(
SIndex
*
idx
,
const
char
*
colName
,
int8_t
type
)
{
IndexCache
*
cache
=
calloc
(
1
,
sizeof
(
IndexCache
));
if
(
cache
==
NULL
)
{
indexError
(
"failed to create index cache"
);
return
NULL
;
}
cache
->
skiplist
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
MAX_INDEX_KEY_LEN
,
compareKey
,
SL_ALLOW_DUP_KEY
,
getIndexKey
);
};
cache
->
mem
=
indexInternalCacheCreate
(
type
);
cache
->
colName
=
calloc
(
1
,
strlen
(
colName
)
+
1
);
memcpy
(
cache
->
colName
,
colName
,
strlen
(
colName
));
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
indexCacheRef
(
cache
);
return
cache
;
}
void
indexCacheDebug
(
IndexCache
*
cache
)
{
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
cache
->
skiplist
);
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
cache
->
mem
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
CacheTerm
*
ct
=
(
CacheTerm
*
)
SL_GET_NODE_DATA
(
node
);
if
(
ct
!=
NULL
)
{
// TODO, add more debug info
indexInfo
(
"{col
Id:%d, colVal: %s, version: %d}
\t
"
,
ct
->
colId
,
ct
->
colVal
,
ct
->
version
);
indexInfo
(
"{col
Val: %s, version: %d}
\t
"
,
ct
->
colVal
,
ct
->
version
);
}
}
tSkipListDestroyIter
(
iter
);
...
...
@@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) {
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
pCache
->
skiplist
);
tSkipListDestroy
(
pCache
->
mem
);
tSkipListDestroy
(
pCache
->
imm
);
free
(
pCache
->
colName
);
free
(
pCache
);
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
static
void
doMergeWork
(
SSchedMsg
*
msg
)
{
IndexCache
*
pCache
=
msg
->
ahandle
;
SIndex
*
sidx
=
(
SIndex
*
)
pCache
->
index
;
indexFlushCacheTFile
(
sidx
,
pCache
);
}
int
indexCacheSchedToMerge
(
IndexCache
*
pCache
)
{
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
doMergeWork
;
schedMsg
.
ahandle
=
pCache
;
schedMsg
.
thandle
=
NULL
;
schedMsg
.
msg
=
NULL
;
taosScheduleTask
(
indexQhandle
,
&
schedMsg
);
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
indexCacheRef
(
pCache
);
// encode data
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
cache
==
NULL
)
{
return
-
1
;
}
// set up key
ct
->
colId
=
colId
;
ct
->
colType
=
term
->
colType
;
ct
->
nColVal
=
term
->
nColVal
;
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
ct
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
ct
->
nColVal
);
ct
->
version
=
version
;
ct
->
version
=
atomic_add_fetch_32
(
&
pCache
->
version
,
1
)
;
// set value
ct
->
uid
=
uid
;
ct
->
operaType
=
term
->
operType
;
tSkipListPut
(
pCache
->
skiplist
,
(
char
*
)
ct
);
tSkipListPut
(
pCache
->
mem
,
(
char
*
)
ct
);
pCache
->
nTerm
+=
1
;
if
(
pCache
->
nTerm
>=
CACH_LIMIT
)
{
pCache
->
nTerm
=
0
;
while
(
pCache
->
imm
!=
NULL
)
{
// do nothong
}
pCache
->
imm
=
pCache
->
mem
;
pCache
->
mem
=
indexInternalCacheCreate
(
pCache
->
type
);
// sched to merge
// unref cache int bgwork
indexCacheSchedToMerge
(
pCache
);
}
indexCacheUnRef
(
pCache
);
return
0
;
// encode end
}
int
indexCacheDel
(
void
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
indexCacheDel
(
void
*
cache
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
IndexCache
*
pCache
=
cache
;
return
0
;
}
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
int
indexCacheSearch
(
void
*
cache
,
SIndexTermQuery
*
query
,
SArray
*
result
,
STermValueType
*
s
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
}
IndexCache
*
pCache
=
cache
;
SIndexTerm
*
term
=
query
->
term
;
...
...
@@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
CacheTerm
*
ct
=
calloc
(
1
,
sizeof
(
CacheTerm
));
if
(
ct
==
NULL
)
{
return
-
1
;
}
ct
->
colId
=
colId
;
ct
->
nColVal
=
term
->
nColVal
;
ct
->
colVal
=
calloc
(
1
,
sizeof
(
char
)
*
(
ct
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
ct
->
nColVal
);
ct
->
version
=
version
;
ct
->
version
=
atomic_load_32
(
&
pCache
->
version
)
;
char
*
key
=
getIndexKey
(
ct
);
// TODO handle multi situation later, and refactor
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
pCache
->
skiplist
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
SSkipListIterator
*
iter
=
tSkipListCreateIterFromVal
(
pCache
->
mem
,
key
,
TSDB_DATA_TYPE_BINARY
,
TSDB_ORDER_ASC
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
if
(
node
!=
NULL
)
{
...
...
@@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
}
return
0
;
}
void
indexCacheRef
(
IndexCache
*
cache
)
{
int
ref
=
T_REF_INC
(
cache
);
UNUSED
(
ref
);
}
void
indexCacheUnRef
(
IndexCache
*
cache
)
{
int
ref
=
T_REF_DEC
(
cache
);
if
(
ref
==
0
)
{
indexCacheDestroy
(
cache
);
}
}
source/libs/index/src/index_fst.c
浏览文件 @
c1605db6
...
...
@@ -1083,7 +1083,7 @@ bool fstBoundWithDataExceededBy(FstBoundWithData* bound, FstSlice* slice) {
}
else
if
(
bound
->
type
==
Excluded
)
{
return
comp
>=
0
?
true
:
false
;
}
else
{
return
tru
e
;
return
fals
e
;
}
}
bool
fstBoundWithDataIsEmpty
(
FstBoundWithData
*
bound
)
{
...
...
@@ -1224,7 +1224,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
void
*
start
=
automFuncs
[
aut
->
type
].
start
(
aut
);
if
(
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
start
))
{
FstSlice
s
=
fstSliceCreate
(
NULL
,
0
);
return
swsResultCreate
(
&
s
,
output
,
callback
(
start
));
return
swsResultCreate
(
&
s
,
output
,
callback
==
NULL
?
NULL
:
callback
(
start
));
}
}
SArray
*
nodes
=
taosArrayInit
(
8
,
sizeof
(
FstNode
*
));
...
...
@@ -1237,10 +1237,12 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
}
FstTransition
trn
;
fstNodeGetTransitionAt
(
p
->
node
,
p
->
trans
,
&
trn
);
Output
out
=
p
->
out
.
out
+
trn
.
out
;
void
*
nextState
=
automFuncs
[
aut
->
type
].
accept
(
aut
,
p
->
autState
,
trn
.
inp
);
void
*
tState
=
callback
(
nextState
);
bool
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
nextState
);
Output
out
=
p
->
out
.
out
+
trn
.
out
;
void
*
nextState
=
automFuncs
[
aut
->
type
].
accept
(
aut
,
p
->
autState
,
trn
.
inp
);
void
*
tState
=
(
callback
==
NULL
)
?
NULL
:
callback
(
nextState
);
bool
isMatch
=
automFuncs
[
aut
->
type
].
isMatch
(
aut
,
nextState
);
FstNode
*
nextNode
=
fstGetNode
(
sws
->
fst
,
trn
.
addr
);
taosArrayPush
(
nodes
,
&
nextNode
);
taosArrayPush
(
sws
->
inp
,
&
(
trn
.
inp
));
...
...
source/libs/index/src/index_fst_automation.c
浏览文件 @
c1605db6
...
...
@@ -64,6 +64,25 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
return
nsv
;
}
// iterate fst
static
void
*
alwaysMatchStart
(
AutomationCtx
*
ctx
)
{
return
NULL
;
}
static
bool
alwaysMatchIsMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
bool
alwaysMatchCanMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
bool
alwaysMatchWillAlwaysMatch
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
true
;
}
static
void
*
alwaysMatchAccpet
(
AutomationCtx
*
ctx
,
void
*
state
,
uint8_t
byte
)
{
return
NULL
;
}
static
void
*
alwaysMatchAccpetEof
(
AutomationCtx
*
ctx
,
void
*
state
)
{
return
NULL
;
}
// prefix query, impl later
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
...
...
@@ -127,6 +146,7 @@ static void* patternAcceptEof(AutomationCtx* ctx, void* state) {
}
AutomationFunc
automFuncs
[]
=
{
{
alwaysMatchStart
,
alwaysMatchIsMatch
,
alwaysMatchCanMatch
,
alwaysMatchWillAlwaysMatch
,
alwaysMatchAccpet
,
alwaysMatchAccpetEof
},
{
prefixStart
,
prefixIsMatch
,
prefixCanMatch
,
prefixWillAlwaysMatch
,
prefixAccept
,
prefixAcceptEof
},
{
patternStart
,
patternIsMatch
,
patternCanMatch
,
patternWillAlwaysMatch
,
patternAccept
,
patternAcceptEof
}
// add more search type
...
...
@@ -137,7 +157,11 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
if
(
ctx
==
NULL
)
{
return
NULL
;
}
StartWithStateValue
*
sv
=
NULL
;
if
(
atype
==
AUTOMATION_PREFIX
)
{
if
(
atype
==
AUTOMATION_ALWAYS
)
{
int
val
=
0
;
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
ctx
->
stdata
=
(
void
*
)
sv
;
}
else
if
(
atype
==
AUTOMATION_PREFIX
)
{
int
val
=
0
;
sv
=
startWithStateValueCreate
(
Running
,
FST_INT
,
&
val
);
ctx
->
stdata
=
(
void
*
)
sv
;
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
c1605db6
...
...
@@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer);
static
int
tfileWriteFstOffset
(
TFileWriter
*
tw
,
int32_t
offset
);
static
int
tfileWriteData
(
TFileWriter
*
write
,
TFileValue
*
tval
);
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
);
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
void
tfileReaderRef
(
TFileReader
*
reader
);
static
void
tfileReaderUnRef
(
TFileReader
*
reader
);
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
);
static
int
tfileReaderLoadFst
(
TFileReader
*
reader
);
static
int
tfileReaderLoadTableIds
(
TFileReader
*
reader
,
int32_t
offset
,
SArray
*
result
);
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
);
static
int
tfileRmExpireFile
(
SArray
*
result
);
...
...
@@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
taosHashPut
(
tcache
->
tableCache
,
buf
,
strlen
(
buf
),
&
reader
,
sizeof
(
void
*
));
return
;
}
TFileReader
*
tfileReaderCreate
(
WriterCtx
*
ctx
)
{
TFileReader
*
reader
=
calloc
(
1
,
sizeof
(
TFileReader
));
if
(
reader
==
NULL
)
{
return
NULL
;
}
...
...
@@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return
0
;
}
TFileReader
*
tfileGetReaderByCol
(
IndexTFile
*
tf
,
char
*
colName
)
{
if
(
tf
==
NULL
)
{
return
NULL
;
}
TFileCacheKey
key
=
{.
suid
=
0
,
.
colType
=
TSDB_DATA_TYPE_BINARY
,
.
colName
=
colName
,
.
nColName
=
strlen
(
colName
)};
return
tfileCacheGet
(
tf
->
cache
,
&
key
);
}
static
int
tfileStrCompare
(
const
void
*
a
,
const
void
*
b
)
{
int
ret
=
strcmp
((
char
*
)
a
,
(
char
*
)
b
);
...
...
@@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
free
(
buf
);
return
0
;
}
static
void
tfileReaderRef
(
TFileReader
*
reader
)
{
void
tfileReaderRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_INC
(
reader
);
UNUSED
(
ref
);
}
static
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
void
tfileReaderUnRef
(
TFileReader
*
reader
)
{
int
ref
=
T_REF_DEC
(
reader
);
if
(
ref
==
0
)
{
tfileReaderDestroy
(
reader
);
}
}
...
...
@@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
return
-
1
;
}
static
void
tfileSerialCacheKey
(
TFileCacheKey
*
key
,
char
*
buf
)
{
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
colType
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
//
SERIALIZE_MEM_TO_BUF(buf, key, suid);
//
SERIALIZE_VAR_TO_BUF(buf, '_', char);
//
SERIALIZE_MEM_TO_BUF(buf, key, colType);
//
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
}
source/libs/index/test/CMakeLists.txt
浏览文件 @
c1605db6
add_executable
(
indexTest
""
)
add_executable
(
fstTest
""
)
target_sources
(
indexTest
PRIVATE
"indexTests.cc"
)
target_sources
(
fstTest
PRIVATE
"fstTest.cc"
)
target_include_directories
(
indexTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
fstTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
indexTest
os
util
...
...
@@ -15,8 +25,16 @@ target_link_libraries (indexTest
gtest_main
index
)
add_test
(
NAME index_test
COMMAND indexTest
target_link_libraries
(
fstTest
os
util
common
gtest_main
index
)
#add_test(
# NAME index_test
# COMMAND indexTest
#)
source/libs/index/test/fstTest.cc
0 → 100644
浏览文件 @
c1605db6
#include <iostream>
#include <string>
#include <vector>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "tskiplist.h"
#include "tutil.h"
void
*
callback
(
void
*
s
)
{
return
s
;
}
static
std
::
string
fileName
=
"/tmp/tindex.tindex"
;
class
FstWriter
{
public:
FstWriter
()
{
remove
(
fileName
.
c_str
());
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
false
,
64
*
1024
*
1024
);
_b
=
fstBuilderCreate
(
_wc
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstBuilderInsert
(
_b
,
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
writerCtxDestroy
(
_wc
);
}
private:
FstBuilder
*
_b
;
WriterCtx
*
_wc
;
};
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
)
{
_wc
=
writerCtxCreate
(
TFile
,
fileName
.
c_str
(),
true
,
64
*
1024
);
_w
=
fstCountingWriterCreate
(
_wc
);
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
}
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
free
(
buf
);
return
_fst
!=
NULL
;
}
bool
Get
(
const
std
::
string
&
key
,
uint64_t
*
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstGet
(
_fst
,
&
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
bool
GetWithTimeCostUs
(
const
std
::
string
&
key
,
uint64_t
*
val
,
uint64_t
*
elapse
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Get
(
key
,
val
);
int64_t
e
=
taosGetTimestampUs
();
*
elapse
=
e
-
s
;
return
ok
;
}
// add later
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
// result.push_back((uint64_t)(rt->out.out));
FstSlice
*
s
=
&
rt
->
data
;
int32_t
sz
=
0
;
char
*
ch
=
(
char
*
)
fstSliceData
(
s
,
&
sz
);
std
::
string
key
(
ch
,
sz
);
printf
(
"key: %s, val: %"
PRIu64
"
\n
"
,
key
.
c_str
(),
(
uint64_t
)(
rt
->
out
.
out
));
swsResultDestroy
(
rt
);
}
for
(
size_t
i
=
0
;
i
<
result
.
size
();
i
++
)
{}
std
::
cout
<<
std
::
endl
;
return
true
;
}
bool
SearchWithTimeCostUs
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>&
result
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Search
(
ctx
,
result
);
int64_t
e
=
taosGetTimestampUs
();
return
ok
;
}
~
FstReadMemory
()
{
fstCountingWriterDestroy
(
_w
);
fstDestroy
(
_fst
);
fstSliceDestroy
(
&
_s
);
writerCtxDestroy
(
_wc
);
}
private:
FstCountingWriter
*
_w
;
Fst
*
_fst
;
FstSlice
_s
;
WriterCtx
*
_wc
;
size_t
_size
;
};
#define L 100
#define M 100
#define N 100
int
Performance_fstWriteRecords
(
FstWriter
*
b
)
{
std
::
string
str
(
"aa"
);
for
(
int
i
=
0
;
i
<
L
;
i
++
)
{
str
[
0
]
=
'a'
+
i
;
str
.
resize
(
2
);
for
(
int
j
=
0
;
j
<
M
;
j
++
)
{
str
[
1
]
=
'a'
+
j
;
str
.
resize
(
2
);
for
(
int
k
=
0
;
k
<
N
;
k
++
)
{
str
.
push_back
(
'a'
);
b
->
Put
(
str
,
k
);
printf
(
"(%d, %d, %d, %s)
\n
"
,
i
,
j
,
k
,
str
.
c_str
());
}
}
}
return
L
*
M
*
N
;
}
void
checkFstCheckIterator
()
{
tfInit
();
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
count
=
2
;
Performance_fstWriteRecords
(
fw
);
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
// prefix search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"ab"
,
AUTOMATION_ALWAYS
);
m
->
Search
(
ctx
,
result
);
std
::
cout
<<
"size: "
<<
result
.
size
()
<<
std
::
endl
;
// assert(result.size() == count);
for
(
int
i
=
0
;
i
<
result
.
size
();
i
++
)
{
// assert(result[i] == i); // check result
}
free
(
ctx
);
delete
m
;
tfCleanup
();
}
int
main
()
{
checkFstCheckIterator
();
// checkFstPrefixSearch();
return
1
;
}
source/libs/index/test/indexTests.cc
浏览文件 @
c1605db6
...
...
@@ -471,18 +471,22 @@ class CacheObj {
public:
CacheObj
()
{
// TODO
cache
=
indexCacheCreate
();
cache
=
indexCacheCreate
(
NULL
,
"voltage"
,
TSDB_DATA_TYPE_BINARY
);
}
int
Put
(
SIndexTerm
*
term
,
int16_t
colId
,
int32_t
version
,
uint64_t
uid
)
{
int
ret
=
indexCachePut
(
cache
,
term
,
colId
,
version
,
uid
);
int
ret
=
indexCachePut
(
cache
,
term
,
uid
);
if
(
ret
!=
0
)
{
//
std
::
cout
<<
"failed to put into cache: "
<<
ret
<<
std
::
endl
;
}
return
ret
;
}
void
Debug
()
{
//
indexCacheDebug
(
cache
);
}
int
Get
(
SIndexTermQuery
*
query
,
int16_t
colId
,
int32_t
version
,
SArray
*
result
,
STermValueType
*
s
)
{
int
ret
=
indexCacheSearch
(
cache
,
query
,
colId
,
version
,
result
,
s
);
int
ret
=
indexCacheSearch
(
cache
,
query
,
result
,
s
);
if
(
ret
!=
0
)
{
//
std
::
cout
<<
"failed to get from cache:"
<<
ret
<<
std
::
endl
;
...
...
@@ -515,6 +519,7 @@ class IndexCacheEnv : public ::testing::Test {
TEST_F
(
IndexCacheEnv
,
cache_test
)
{
int
version
=
0
;
int16_t
colId
=
0
;
int16_t
othColId
=
10
;
uint64_t
suid
=
0
;
std
::
string
colName
(
"voltage"
);
...
...
@@ -544,15 +549,27 @@ TEST_F(IndexCacheEnv, cache_test) {
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v4"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
othColId
,
version
++
,
suid
++
);
}
{
std
::
string
colVal
(
"v4"
);
for
(
size_t
i
=
0
;
i
<
10
0
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
10
;
i
++
)
{
colVal
[
colVal
.
size
()
-
1
]
=
'a'
+
i
;
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
coj
->
Put
(
term
,
colId
,
version
++
,
suid
++
);
}
}
coj
->
Debug
();
// begin query
{
std
::
string
colVal
(
"v3"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
...
...
@@ -561,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) {
STermValueType
valType
;
coj
->
Get
(
&
query
,
colId
,
10000
,
ret
,
&
valType
);
assert
(
taosArrayGetSize
(
ret
)
==
3
);
// std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert
(
taosArrayGetSize
(
ret
)
==
4
);
}
{
std
::
string
colVal
(
"v2"
);
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
c1605db6
...
...
@@ -92,8 +92,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
pRsp
->
vgVersion
=
hton
l
(
pRsp
->
vgVersion
);
pRsp
->
vgNum
=
hton
l
(
pRsp
->
vgNum
);
pRsp
->
vgVersion
=
ntoh
l
(
pRsp
->
vgVersion
);
pRsp
->
vgNum
=
ntoh
l
(
pRsp
->
vgNum
);
if
(
pRsp
->
vgNum
<
0
)
{
qError
(
"invalid db[%s] vgroup number[%d]"
,
pRsp
->
db
,
pRsp
->
vgNum
);
...
...
@@ -115,12 +115,12 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
pRsp
->
vgroupInfo
[
i
].
vgId
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
hton
s
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
ntoh
s
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
...
...
@@ -142,13 +142,13 @@ _return:
}
static
int32_t
queryConvertTableMetaMsg
(
STableMetaMsg
*
pMetaMsg
)
{
pMetaMsg
->
numOfTags
=
hton
l
(
pMetaMsg
->
numOfTags
);
pMetaMsg
->
numOfColumns
=
hton
l
(
pMetaMsg
->
numOfColumns
);
pMetaMsg
->
sversion
=
hton
l
(
pMetaMsg
->
sversion
);
pMetaMsg
->
tversion
=
hton
l
(
pMetaMsg
->
tversion
);
pMetaMsg
->
numOfTags
=
ntoh
l
(
pMetaMsg
->
numOfTags
);
pMetaMsg
->
numOfColumns
=
ntoh
l
(
pMetaMsg
->
numOfColumns
);
pMetaMsg
->
sversion
=
ntoh
l
(
pMetaMsg
->
sversion
);
pMetaMsg
->
tversion
=
ntoh
l
(
pMetaMsg
->
tversion
);
pMetaMsg
->
tuid
=
htobe64
(
pMetaMsg
->
tuid
);
pMetaMsg
->
suid
=
htobe64
(
pMetaMsg
->
suid
);
pMetaMsg
->
vgId
=
hton
l
(
pMetaMsg
->
vgId
);
pMetaMsg
->
vgId
=
ntoh
l
(
pMetaMsg
->
vgId
);
if
(
pMetaMsg
->
numOfTags
<
0
||
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
)
{
qError
(
"invalid numOfTags[%d] in table meta rsp msg"
,
pMetaMsg
->
numOfTags
);
...
...
@@ -179,8 +179,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
int32_t
numOfTotalCols
=
pMetaMsg
->
numOfColumns
+
pMetaMsg
->
numOfTags
;
for
(
int
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pSchema
->
bytes
=
hton
l
(
pSchema
->
bytes
);
pSchema
->
colId
=
hton
l
(
pSchema
->
colId
);
pSchema
->
bytes
=
ntoh
l
(
pSchema
->
bytes
);
pSchema
->
colId
=
ntoh
l
(
pSchema
->
colId
);
pSchema
++
;
}
...
...
@@ -202,7 +202,8 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
qError
(
"calloc size[%d] failed"
,
metaSize
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pTableMeta
->
vgId
=
isSuperTable
?
0
:
msg
->
vgId
;
pTableMeta
->
tableType
=
isSuperTable
?
TSDB_SUPER_TABLE
:
msg
->
tableType
;
pTableMeta
->
uid
=
msg
->
suid
;
pTableMeta
->
suid
=
msg
->
suid
;
...
...
@@ -213,12 +214,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta
->
tableInfo
.
precision
=
msg
->
precision
;
pTableMeta
->
tableInfo
.
numOfColumns
=
msg
->
numOfColumns
;
memcpy
(
pTableMeta
->
schema
,
msg
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
for
(
int32_t
i
=
0
;
i
<
msg
->
numOfColumns
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
memcpy
(
pTableMeta
->
schema
,
msg
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
*
pMeta
=
pTableMeta
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/qworker/CMakeLists.txt
浏览文件 @
c1605db6
...
...
@@ -10,3 +10,5 @@ target_link_libraries(
qworker
PRIVATE os util transport planner qcom
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/qworker/src/qworker.c
浏览文件 @
c1605db6
...
...
@@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qError
(
"invalid query msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
schedulerId
=
htobe64
(
msg
->
schedulerId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
bool
queryDone
=
false
;
bool
queryRsp
=
false
;
...
...
source/libs/qworker/test/CMakeLists.txt
0 → 100644
浏览文件 @
c1605db6
MESSAGE
(
STATUS
"build qworker unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
qworkerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
qworkerTest
PUBLIC os util common transport gtest qcom planner qworker
)
TARGET_INCLUDE_DIRECTORIES
(
qworkerTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/qworker/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/qworker/inc"
)
source/libs/qworker/test/qworkerTests.cpp
0 → 100644
浏览文件 @
c1605db6
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "trpc.h"
#include "planner.h"
#include "qworker.h"
#include "stub.h"
#include "addr_any.h"
namespace
{
int32_t
qwtStringToPlan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
return
0
;
}
void
stubSetStringToPlan
()
{
static
Stub
stub
;
stub
.
set
(
qStringToSubplan
,
qwtStringToPlan
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qStringToSubplan$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
qwtStringToPlan
);
}
}
}
}
TEST
(
testCase
,
normalCase
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
readyRpc
=
{
0
};
SRpcMsg
fetchRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
schedulerId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
SResReadyMsg
readyMsg
=
{
0
};
readyMsg
.
schedulerId
=
htobe64
(
1
);
readyMsg
.
queryId
=
htobe64
(
1
);
readyMsg
.
taskId
=
htobe64
(
1
);
readyRpc
.
pCont
=
&
readyMsg
;
SResFetchMsg
fetchMsg
=
{
0
};
fetchMsg
.
schedulerId
=
htobe64
(
1
);
fetchMsg
.
queryId
=
htobe64
(
1
);
fetchMsg
.
taskId
=
htobe64
(
1
);
fetchRpc
.
pCont
=
&
fetchMsg
;
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
schedulerId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
stubSetStringToPlan
();
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
&
fetchRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
}
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
c1605db6
...
...
@@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt {
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
typedef
struct
S
Query
Level
{
typedef
struct
S
Sch
Level
{
int32_t
level
;
int8_t
status
;
SRWLatch
lock
;
...
...
@@ -51,12 +51,12 @@ typedef struct SQueryLevel {
int32_t
taskSucceed
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
}
S
Query
Level
;
}
S
Sch
Level
;
typedef
struct
S
Query
Task
{
typedef
struct
S
Sch
Task
{
uint64_t
taskId
;
// task id
S
QueryLevel
*
level
;
// level
S
SchLevel
*
level
;
// level
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
...
...
@@ -66,13 +66,20 @@ typedef struct SQueryTask {
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
}
S
Query
Task
;
}
S
Sch
Task
;
typedef
struct
SQueryJob
{
typedef
struct
SSchJobAttr
{
bool
needFetch
;
bool
syncSchedule
;
bool
queryJob
;
}
SSchJobAttr
;
typedef
struct
SSchJob
{
uint64_t
queryId
;
int32_t
levelNum
;
int32_t
levelIdx
;
int8_t
status
;
SSchJobAttr
attr
;
SQueryProfileSummary
summary
;
SEpSet
dataSrcEps
;
SEpAddr
resEp
;
...
...
@@ -81,15 +88,19 @@ typedef struct SQueryJob {
tsem_t
rspSem
;
int32_t
userFetch
;
int32_t
remoteFetch
;
void
*
res
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
int32_t
resNumOfRows
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
S
Query
Job
;
}
S
Sch
Job
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
...
...
@@ -108,7 +119,7 @@ typedef struct SQueryJob {
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern
int32_t
schLaunchTask
(
S
QueryJob
*
job
,
SQuery
Task
*
task
);
extern
int32_t
schLaunchTask
(
S
SchJob
*
job
,
SSch
Task
*
task
);
#ifdef __cplusplus
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
c1605db6
此差异已折叠。
点击以展开。
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
c1605db6
...
...
@@ -26,72 +26,310 @@
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
#include "scheduler.h"
#include "catalog.h"
#include "scheduler.h"
#include "tep.h"
#include "trpc.h"
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
namespace
{
void
mockBuildDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x111111111111
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
dag
->
numOfSubplans
,
POINTER_BYTES
);
SArray
*
scan
=
taosArrayInit
(
1
,
sizeof
(
SSubplan
));
SArray
*
merge
=
taosArrayInit
(
1
,
sizeof
(
SSubplan
));
SSubplan
scanPlan
=
{
0
};
SSubplan
mergePlan
=
{
0
};
scanPlan
.
id
.
queryId
=
qId
;
scanPlan
.
id
.
templateId
=
0x2222222222
;
scanPlan
.
id
.
subplanId
=
0x3333333333
;
scanPlan
.
type
=
QUERY_TYPE_SCAN
;
scanPlan
.
level
=
1
;
scanPlan
.
execEpSet
.
numOfEps
=
1
;
scanPlan
.
pChildern
=
NULL
;
scanPlan
.
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
id
.
queryId
=
qId
;
mergePlan
.
id
.
templateId
=
0x4444444444
;
mergePlan
.
id
.
subplanId
=
0x5555555555
;
mergePlan
.
type
=
QUERY_TYPE_MERGE
;
mergePlan
.
level
=
0
;
mergePlan
.
execEpSet
.
numOfEps
=
1
;
mergePlan
.
pChildern
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
pParents
=
NULL
;
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
taosArrayPush
(
mergePointer
->
pChildern
,
&
scanPointer
);
taosArrayPush
(
scanPointer
->
pParents
,
&
mergePointer
);
taosArrayPush
(
dag
->
pSubplans
,
&
merge
);
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
}
extern
"C"
int32_t
schHandleRspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
dag
->
numOfSubplans
,
POINTER_BYTES
);
SArray
*
scan
=
taosArrayInit
(
1
,
sizeof
(
SSubplan
));
SArray
*
merge
=
taosArrayInit
(
1
,
sizeof
(
SSubplan
));
SSubplan
scanPlan
=
{
0
};
SSubplan
mergePlan
=
{
0
};
scanPlan
.
id
.
queryId
=
qId
;
scanPlan
.
id
.
templateId
=
0x0000000000000002
;
scanPlan
.
id
.
subplanId
=
0x0000000000000003
;
scanPlan
.
type
=
QUERY_TYPE_SCAN
;
scanPlan
.
level
=
1
;
scanPlan
.
execEpSet
.
numOfEps
=
1
;
scanPlan
.
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
scanPlan
.
execEpSet
.
fqdn
[
0
],
"ep0"
);
scanPlan
.
pChildern
=
NULL
;
scanPlan
.
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
scanPlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
mergePlan
.
id
.
queryId
=
qId
;
mergePlan
.
id
.
templateId
=
0x4444444444
;
mergePlan
.
id
.
subplanId
=
0x5555555555
;
mergePlan
.
type
=
QUERY_TYPE_MERGE
;
mergePlan
.
level
=
0
;
mergePlan
.
execEpSet
.
numOfEps
=
0
;
mergePlan
.
pChildern
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
pParents
=
NULL
;
mergePlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
taosArrayPush
(
mergePointer
->
pChildern
,
&
scanPointer
);
taosArrayPush
(
scanPointer
->
pParents
,
&
mergePointer
);
taosArrayPush
(
dag
->
pSubplans
,
&
merge
);
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
}
void
schtBuildInsertDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000002
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
inserta
=
taosArrayInit
(
dag
->
numOfSubplans
,
sizeof
(
SSubplan
));
SSubplan
insertPlan
[
2
]
=
{
0
};
insertPlan
[
0
].
id
.
queryId
=
qId
;
insertPlan
[
0
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
0
].
id
.
subplanId
=
0x0000000000000004
;
insertPlan
[
0
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
0
].
level
=
0
;
insertPlan
[
0
].
execEpSet
.
numOfEps
=
1
;
insertPlan
[
0
].
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
insertPlan
[
0
].
execEpSet
.
fqdn
[
0
],
"ep0"
);
insertPlan
[
0
].
pChildern
=
NULL
;
insertPlan
[
0
].
pParents
=
NULL
;
insertPlan
[
0
].
pNode
=
NULL
;
insertPlan
[
0
].
pDataSink
=
(
SDataSink
*
)
calloc
(
1
,
sizeof
(
SDataSink
));
insertPlan
[
1
].
id
.
queryId
=
qId
;
insertPlan
[
1
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
1
].
id
.
subplanId
=
0x0000000000000005
;
insertPlan
[
1
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
1
].
level
=
0
;
insertPlan
[
1
].
execEpSet
.
numOfEps
=
1
;
insertPlan
[
1
].
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
insertPlan
[
1
].
execEpSet
.
fqdn
[
0
],
"ep1"
);
insertPlan
[
1
].
pChildern
=
NULL
;
insertPlan
[
1
].
pParents
=
NULL
;
insertPlan
[
1
].
pNode
=
NULL
;
insertPlan
[
1
].
pDataSink
=
(
SDataSink
*
)
calloc
(
1
,
sizeof
(
SDataSink
));
taosArrayPush
(
inserta
,
&
insertPlan
[
0
]);
taosArrayPush
(
inserta
,
&
insertPlan
[
1
]);
taosArrayPush
(
dag
->
pSubplans
,
&
inserta
);
}
int32_t
schtPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
*
str
=
(
char
*
)
calloc
(
1
,
20
);
*
len
=
20
;
return
0
;
}
int32_t
schtExecNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SEpAddr
*
ep
)
{
return
0
;
}
void
schtSetPlanToString
()
{
static
Stub
stub
;
stub
.
set
(
qSubPlanToString
,
schtPlanToString
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSubPlanToString$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtPlanToString
);
}
}
}
void
schtSetExecNode
()
{
static
Stub
stub
;
stub
.
set
(
qSetSubplanExecutionNode
,
schtExecNode
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSetSubplanExecutionNode$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtExecNode
);
}
}
}
TEST
(
testCase
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
schtSendRsp
(
void
*
param
)
{
SSchJob
*
job
=
NULL
;
int32_t
code
=
0
;
while
(
true
)
{
job
=
*
(
SSchJob
**
)
param
;
if
(
job
)
{
break
;
}
usleep
(
1000
);
}
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SShellSubmitRspMsg
rsp
=
{
0
};
rsp
.
affectedRows
=
10
;
schHandleRspMsg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
return
NULL
;
}
void
*
pInsertJob
=
NULL
;
}
TEST
(
queryTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
void
*
pJob
=
NULL
;
SQueryDag
dag
=
{
0
};
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
int32_t
code
=
schedulerInit
(
NULL
);
SVgroupInfo
vgInfo
=
{
0
};
void
*
pJob
=
NULL
;
SQueryDag
dag
=
{
0
};
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
mockBuildDag
(
&
dag
);
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pJob
);
schtBuildQueryDag
(
&
dag
);
schtSetPlanToString
();
schtSetExecNode
();
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pJob
);
ASSERT_EQ
(
code
,
0
);
}
SSchJob
*
job
=
(
SSchJob
*
)
pJob
;
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
SRetrieveTableRsp
rsp
=
{
0
};
rsp
.
completed
=
1
;
rsp
.
numOfRows
=
10
;
code
=
schHandleRspMsg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
ASSERT_EQ
(
pRsp
->
completed
,
1
);
ASSERT_EQ
(
pRsp
->
numOfRows
,
10
);
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
data
,
(
void
*
)
NULL
);
scheduleFreeJob
(
pJob
);
}
TEST
(
insertTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SQueryDag
dag
=
{
0
};
uint64_t
numOfRows
=
0
;
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
schtBuildInsertDag
(
&
dag
);
schtSetPlanToString
();
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
pInsertJob
);
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pInsertJob
,
&
numOfRows
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
numOfRows
,
20
);
scheduleFreeJob
(
pInsertJob
);
}
int
main
(
int
argc
,
char
**
argv
)
{
...
...
@@ -101,4 +339,4 @@ int main(int argc, char** argv) {
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录