Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f7239014
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f7239014
编写于
3月 03, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/3.0_mhli
上级
64d224a0
47f4edc3
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
1057 addition
and
120 deletion
+1057
-120
include/libs/index/index.h
include/libs/index/index.h
+102
-12
include/libs/monitor/monitor.h
include/libs/monitor/monitor.h
+17
-11
source/dnode/mgmt/impl/inc/dndEnv.h
source/dnode/mgmt/impl/inc/dndEnv.h
+3
-0
source/dnode/mgmt/impl/src/dndEnv.c
source/dnode/mgmt/impl/src/dndEnv.c
+2
-0
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+20
-6
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+42
-19
source/libs/index/inc/index_comm.h
source/libs/index/inc/index_comm.h
+32
-0
source/libs/index/src/index.c
source/libs/index/src/index.c
+19
-7
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+22
-12
source/libs/index/src/index_comm.c
source/libs/index/src/index_comm.c
+48
-0
source/libs/index/src/index_json.c
source/libs/index/src/index_json.c
+44
-0
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+23
-6
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+18
-0
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+15
-10
source/libs/index/test/fstUT.cc
source/libs/index/test/fstUT.cc
+21
-5
source/libs/index/test/jsonUT.cc
source/libs/index/test/jsonUT.cc
+135
-0
source/libs/monitor/src/monitor.c
source/libs/monitor/src/monitor.c
+235
-16
source/libs/monitor/test/monTest.cpp
source/libs/monitor/test/monTest.cpp
+247
-6
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+12
-10
未找到文件。
include/libs/index/index.h
浏览文件 @
f7239014
...
@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts;
...
@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SArray
SIndexMultiTerm
;
typedef
struct
SArray
SIndexMultiTerm
;
typedef
struct
SIndex
SIndexJson
;
typedef
struct
SIndexTerm
SIndexJsonTerm
;
typedef
struct
SIndexOpts
SIndexJsonOpts
;
typedef
struct
SIndexMultiTermQuery
SIndexJsonMultiTermQuery
;
typedef
struct
SArray
SIndexJsonMultiTerm
;
typedef
enum
{
typedef
enum
{
ADD_VALUE
,
// add index colume value
ADD_VALUE
,
// add index colume value
DEL_VALUE
,
// delete index column value
DEL_VALUE
,
// delete index column value
...
@@ -39,24 +45,108 @@ typedef enum {
...
@@ -39,24 +45,108 @@ typedef enum {
}
SIndexOperOnColumn
;
}
SIndexOperOnColumn
;
typedef
enum
{
MUST
=
0
,
SHOULD
=
1
,
NOT
=
2
}
EIndexOperatorType
;
typedef
enum
{
MUST
=
0
,
SHOULD
=
1
,
NOT
=
2
}
EIndexOperatorType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
=
1
,
QUERY_SUFFIX
=
2
,
QUERY_REGEX
=
3
}
EIndexQueryType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
=
1
,
QUERY_SUFFIX
=
2
,
QUERY_REGEX
=
3
,
QUERY_RANGE
=
4
}
EIndexQueryType
;
/*
/*
*
@param: oper
*
create multi query
*
*
@param oper (input, relation between querys)
*/
*/
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
oper
);
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
oper
);
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
);
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
type
);
/*
/*
* @param:
* destroy multi query
* @param:
* @param pQuery (input, multi-query-object to be destory)
*/
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
);
/*
* add query to multi query
* @param pQuery (input, multi-query-object)
* @param term (input, single query term)
* @param type (input, single query type)
* @return error code
*/
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
type
);
/*
* open index
* @param opt (input, index opt)
* @param path (input, index path)
* @param index (output, index object)
* @return error code
*/
int
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
,
SIndex
**
index
);
/*
* close index
* @param index (input, index to be closed)
* @return error code
*/
*/
int
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
,
SIndex
**
index
);
void
indexClose
(
SIndex
*
index
);
void
indexClose
(
SIndex
*
index
);
int
indexPut
(
SIndex
*
index
,
SIndexMultiTerm
*
terms
,
uint64_t
uid
);
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
);
/*
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
* insert terms into index
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opt
);
* @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int
indexPut
(
SIndex
*
index
,
SIndexMultiTerm
*
terms
,
uint64_t
uid
);
/*
* delete terms that meet query condition
* @param index (input, index object)
* @param query (input, condition query to deleted)
* @return error code
*/
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
/*
* rebuild index
* @param index (input, index object)
* @parma opt (input, rebuild index opts)
* @return error code
*/
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opt
);
/*
* open index
* @param opt (input,index json opt)
* @param path (input, index json path)
* @param index (output, index json object)
* @return error code
*/
int
tIndexJsonOpen
(
SIndexJsonOpts
*
opts
,
const
char
*
path
,
SIndexJson
**
index
);
/*
* close index
* @param index (input, index to be closed)
* @return void
*/
void
tIndexJsonClose
(
SIndexJson
*
index
);
/*
* insert terms into index
* @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int
tIndexJsonPut
(
SIndexJson
*
index
,
SIndexJsonMultiTerm
*
terms
,
uint64_t
uid
);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int
tIndexJsonSearch
(
SIndexJson
*
index
,
SIndexJsonMultiTermQuery
*
query
,
SArray
*
result
);
/*
/*
* @param
* @param
* @param
* @param
...
...
include/libs/monitor/monitor.h
浏览文件 @
f7239014
...
@@ -23,6 +23,11 @@
...
@@ -23,6 +23,11 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
#define MON_STATUS_LEN 8
#define MON_ROLE_LEN 9
#define MON_VER_LEN 12
#define MON_LOG_LEN 1024
typedef
struct
{
typedef
struct
{
int32_t
dnode_id
;
int32_t
dnode_id
;
char
dnode_ep
[
TSDB_EP_LEN
];
char
dnode_ep
[
TSDB_EP_LEN
];
...
@@ -31,19 +36,19 @@ typedef struct {
...
@@ -31,19 +36,19 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
dnode_id
;
int32_t
dnode_id
;
char
dnode_ep
[
TSDB_EP_LEN
];
char
dnode_ep
[
TSDB_EP_LEN
];
char
status
[
8
];
char
status
[
MON_STATUS_LEN
];
}
SMonDnodeDesc
;
}
SMonDnodeDesc
;
typedef
struct
{
typedef
struct
{
int32_t
mnode_id
;
int32_t
mnode_id
;
char
mnode_ep
[
TSDB_EP_LEN
];
char
mnode_ep
[
TSDB_EP_LEN
];
char
role
[
8
];
char
role
[
MON_ROLE_LEN
];
}
SMonMnodeDesc
;
}
SMonMnodeDesc
;
typedef
struct
{
typedef
struct
{
char
first_ep
[
TSDB_EP_LEN
];
char
first_ep
[
TSDB_EP_LEN
];
int32_t
first_ep_dnode_id
;
int32_t
first_ep_dnode_id
;
char
version
[
12
];
char
version
[
MON_VER_LEN
];
float
master_uptime
;
// day
float
master_uptime
;
// day
int32_t
monitor_interval
;
// sec
int32_t
monitor_interval
;
// sec
int32_t
vgroups_total
;
int32_t
vgroups_total
;
...
@@ -57,19 +62,18 @@ typedef struct {
...
@@ -57,19 +62,18 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
dnode_id
;
int32_t
dnode_id
;
int8_t
vnode_online
;
char
vnode_role
[
MON_ROLE_LEN
];
char
vnode_role
[
8
];
}
SMonVnodeDesc
;
}
SMonVnodeDesc
;
typedef
struct
{
typedef
struct
{
int32_t
vgroup_id
;
int32_t
vgroup_id
;
char
database_name
[
TSDB_DB_NAME_LEN
];
int32_t
tables_num
;
char
status
[
MON_STATUS_LEN
];
SMonVnodeDesc
vnodes
[
TSDB_MAX_REPLICA
];
SMonVnodeDesc
vnodes
[
TSDB_MAX_REPLICA
];
}
SMonVgroupDesc
;
}
SMonVgroupDesc
;
typedef
struct
{
typedef
struct
{
char
database_name
[
TSDB_DB_NAME_LEN
];
int32_t
tables_num
;
int8_t
status
;
SArray
*
vgroups
;
// array of SMonVgroupDesc
SArray
*
vgroups
;
// array of SMonVgroupDesc
}
SMonVgroupInfo
;
}
SMonVgroupInfo
;
...
@@ -107,7 +111,7 @@ typedef struct {
...
@@ -107,7 +111,7 @@ typedef struct {
int32_t
errors
;
int32_t
errors
;
int32_t
vnodes_num
;
int32_t
vnodes_num
;
int32_t
masters
;
int32_t
masters
;
int
32_t
has_mnode
;
int
8_t
has_mnode
;
}
SMonDnodeInfo
;
}
SMonDnodeInfo
;
typedef
struct
{
typedef
struct
{
...
@@ -117,13 +121,15 @@ typedef struct {
...
@@ -117,13 +121,15 @@ typedef struct {
}
SMonDiskDesc
;
}
SMonDiskDesc
;
typedef
struct
{
typedef
struct
{
SArray
*
disks
;
// array of SMonDiskDesc
SArray
*
datadirs
;
// array of SMonDiskDesc
SMonDiskDesc
logdir
;
SMonDiskDesc
tempdir
;
}
SMonDiskInfo
;
}
SMonDiskInfo
;
typedef
struct
{
typedef
struct
{
int64_t
ts
;
int64_t
ts
;
int8_t
level
;
int8_t
level
;
char
content
[
1024
];
char
content
[
MON_LOG_LEN
];
}
SMonLogItem
;
}
SMonLogItem
;
typedef
struct
SMonInfo
SMonInfo
;
typedef
struct
SMonInfo
SMonInfo
;
...
...
source/dnode/mgmt/impl/inc/dndEnv.h
浏览文件 @
f7239014
...
@@ -137,6 +137,9 @@ typedef struct SDnode {
...
@@ -137,6 +137,9 @@ typedef struct SDnode {
SStartupReq
startup
;
SStartupReq
startup
;
}
SDnode
;
}
SDnode
;
int32_t
dndGetDiskInfo
(
SDnode
*
pDnode
,
SMonDiskInfo
*
pInfo
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mgmt/impl/src/dndEnv.c
浏览文件 @
f7239014
...
@@ -323,3 +323,5 @@ void dndCleanup() {
...
@@ -323,3 +323,5 @@ void dndCleanup() {
taosStopCacheRefreshWorker
();
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
dInfo
(
"dnode env is cleaned up"
);
}
}
int32_t
dndGetDiskInfo
(
SDnode
*
pDnode
,
SMonDiskInfo
*
pInfo
)
{
return
0
;
}
\ No newline at end of file
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
f7239014
...
@@ -474,21 +474,25 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
...
@@ -474,21 +474,25 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendResponse
(
&
rpcRsp
);
rpcSendResponse
(
&
rpcRsp
);
}
}
void
dndGetBasicInfo
(
SDnode
*
pDnode
,
SMonBasicInfo
*
pInfo
)
{
static
int32_t
dndGetBasicInfo
(
SDnode
*
pDnode
,
SMonBasicInfo
*
pInfo
)
{
pInfo
->
dnode_id
=
dndGetDnodeId
(
pDnode
);
pInfo
->
dnode_id
=
dndGetDnodeId
(
pDnode
);
tstrncpy
(
pInfo
->
dnode_ep
,
tsLocalEp
,
TSDB_EP_LEN
);
tstrncpy
(
pInfo
->
dnode_ep
,
tsLocalEp
,
TSDB_EP_LEN
);
return
0
;
}
}
static
int32_t
dndGetDnodeInfo
(
SDnode
*
pDnode
,
SMonDnodeInfo
*
pInfo
)
{
return
0
;
}
static
void
dndSendMonitorReport
(
SDnode
*
pDnode
)
{
static
void
dndSendMonitorReport
(
SDnode
*
pDnode
)
{
if
(
!
tsEnableMonitor
||
tsMonitorFqdn
[
0
]
==
0
)
return
;
if
(
!
tsEnableMonitor
||
tsMonitorFqdn
[
0
]
==
0
||
tsMonitorPort
==
0
)
return
;
dTrace
(
"pDnode:%p, send monitor report to %s:%u"
,
pDnode
,
tsMonitorFqdn
,
tsMonitorPort
);
SMonInfo
*
pMonitor
=
monCreateMonitorInfo
();
SMonInfo
*
pMonitor
=
monCreateMonitorInfo
();
if
(
pMonitor
==
NULL
)
return
;
if
(
pMonitor
==
NULL
)
return
;
dTrace
(
"pDnode:%p, send monitor report to %s:%u"
,
pDnode
,
tsMonitorFqdn
,
tsMonitorPort
);
SMonBasicInfo
basicInfo
=
{
0
};
SMonBasicInfo
basicInfo
=
{
0
};
dndGetBasicInfo
(
pDnode
,
&
basicInfo
);
if
(
dndGetBasicInfo
(
pDnode
,
&
basicInfo
)
==
0
)
{
monSetBasicInfo
(
pMonitor
,
&
basicInfo
);
monSetBasicInfo
(
pMonitor
,
&
basicInfo
);
}
SMonClusterInfo
clusterInfo
=
{
0
};
SMonClusterInfo
clusterInfo
=
{
0
};
SMonVgroupInfo
vgroupInfo
=
{
0
};
SMonVgroupInfo
vgroupInfo
=
{
0
};
...
@@ -499,6 +503,16 @@ static void dndSendMonitorReport(SDnode *pDnode) {
...
@@ -499,6 +503,16 @@ static void dndSendMonitorReport(SDnode *pDnode) {
monSetGrantInfo
(
pMonitor
,
&
grantInfo
);
monSetGrantInfo
(
pMonitor
,
&
grantInfo
);
}
}
SMonDnodeInfo
dnodeInfo
=
{
0
};
if
(
dndGetDnodeInfo
(
pDnode
,
&
dnodeInfo
)
==
0
)
{
monSetDnodeInfo
(
pMonitor
,
&
dnodeInfo
);
}
SMonDiskInfo
diskInfo
=
{
0
};
if
(
dndGetDiskInfo
(
pDnode
,
&
diskInfo
)
==
0
)
{
monSetDiskInfo
(
pMonitor
,
&
diskInfo
);
}
monSendReport
(
pMonitor
);
monSendReport
(
pMonitor
);
monCleanupMonitorInfo
(
pMonitor
);
monCleanupMonitorInfo
(
pMonitor
);
}
}
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
f7239014
...
@@ -122,30 +122,53 @@ typedef struct TFileCacheKey {
...
@@ -122,30 +122,53 @@ typedef struct TFileCacheKey {
int
indexFlushCacheToTFile
(
SIndex
*
sIdx
,
void
*
);
int
indexFlushCacheToTFile
(
SIndex
*
sIdx
,
void
*
);
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
#define indexFatal(...) \
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
} while (0)
} while (0)
#define indexError(...) \
#define indexWarn(...) \
do { \
do { \
if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
} while (0)
} while (0)
#define indexWarn(...) \
#define indexInfo(...) \
do { \
do { \
if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
} while (0)
} while (0)
#define indexInfo(...) \
#define indexDebug(...) \
do { \
do { \
if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0)
} while (0)
#define indexDebug(...) \
#define indexTrace(...) \
do { \
do { \
if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0)
} while (0)
#define indexTrace(...) \
do { \
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \
uint8_t oldTy = ty; \
ty = (ty >> 4) | exTy; \
ty = (ty << 4) | oldTy; \
} while (0)
} while (0)
#ifdef __cplusplus
#ifdef __cplusplus
...
...
source/libs/index/inc/index_comm.h
0 → 100644
浏览文件 @
f7239014
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_INDEX_COMM_H_
#define _TD_INDEX_COMM_H_
#ifdef __cplusplus
extern
"C"
{
#endif
extern
char
JSON_COLUMN
[];
extern
char
JSON_VALUE_DELIM
;
char
*
indexPackJsonData
(
SIndexTerm
*
itm
);
#ifdef __cplusplus
}
#endif
#endif
source/libs/index/src/index.c
浏览文件 @
f7239014
...
@@ -2,8 +2,8 @@
...
@@ -2,8 +2,8 @@
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
*
* This program is free software: you can use, redistribute, and/or modify
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free
*
or later ("AGPL"), as published by the Free
Software Foundation.
* Software Foundation.
*
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
...
@@ -30,6 +30,8 @@
...
@@ -30,6 +30,8 @@
void
*
indexQhandle
=
NULL
;
void
*
indexQhandle
=
NULL
;
static
char
JSON_COLUMN
[]
=
"JSON"
;
void
indexInit
()
{
void
indexInit
()
{
// refactor later
// refactor later
indexQhandle
=
taosInitScheduler
(
INDEX_QUEUE_SIZE
,
INDEX_NUM_OF_THREADS
,
"index"
);
indexQhandle
=
taosInitScheduler
(
INDEX_QUEUE_SIZE
,
INDEX_NUM_OF_THREADS
,
"index"
);
...
@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
...
@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
static
void
indexMergeCacheAndTFile
(
SArray
*
result
,
IterateValue
*
icache
,
IterateValue
*
iTfv
);
static
void
indexMergeCacheAndTFile
(
SArray
*
result
,
IterateValue
*
icache
,
IterateValue
*
iTfv
);
static
void
indexMergeSameKey
(
SArray
*
result
,
TFileValue
*
tv
);
static
void
indexMergeSameKey
(
SArray
*
result
,
TFileValue
*
tv
);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
...
@@ -148,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
...
@@ -148,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)
,
.
colType
=
p
->
colType
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
...
@@ -163,7 +168,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
...
@@ -163,7 +168,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
)
,
.
colType
=
p
->
colType
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
...
@@ -330,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
...
@@ -330,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
IndexCache
*
cache
=
NULL
;
IndexCache
*
cache
=
NULL
;
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
)};
ICacheKey
key
=
{
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
),
.
colType
=
term
->
colType
};
int32_t
sz
=
indexSerialCacheKey
(
&
key
,
buf
);
pthread_mutex_lock
(
&
sIdx
->
mtx
);
pthread_mutex_lock
(
&
sIdx
->
mtx
);
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
buf
,
sz
);
IndexCache
**
pCache
=
taosHashGet
(
sIdx
->
colObj
,
buf
,
sz
);
...
@@ -555,11 +561,17 @@ END:
...
@@ -555,11 +561,17 @@ END:
}
}
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
int32_t
indexSerialCacheKey
(
ICacheKey
*
key
,
char
*
buf
)
{
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
key
->
colType
,
TSDB_DATA_TYPE_JSON
);
char
*
p
=
buf
;
char
*
p
=
buf
;
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
if
(
hasJson
)
{
SERIALIZE_STR_VAR_TO_BUF
(
buf
,
JSON_COLUMN
,
strlen
(
JSON_COLUMN
));
}
else
{
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
}
return
buf
-
p
;
return
buf
-
p
;
}
}
source/libs/index/src/index_cache.c
浏览文件 @
f7239014
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
*/
*/
#include "index_cache.h"
#include "index_cache.h"
#include "index_comm.h"
#include "index_util.h"
#include "index_util.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tsched.h"
#include "tsched.h"
...
@@ -44,8 +45,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
...
@@ -44,8 +45,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
indexError
(
"failed to create index cache"
);
indexError
(
"failed to create index cache"
);
return
NULL
;
return
NULL
;
};
};
cache
->
mem
=
indexInternalCacheCreate
(
type
);
cache
->
mem
=
indexInternalCacheCreate
(
type
);
cache
->
colName
=
tstrdup
(
colName
);
cache
->
colName
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
tstrdup
(
JSON_COLUMN
)
:
tstrdup
(
colName
);
cache
->
type
=
type
;
cache
->
type
=
type
;
cache
->
index
=
idx
;
cache
->
index
=
idx
;
cache
->
version
=
0
;
cache
->
version
=
0
;
...
@@ -207,11 +209,11 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
...
@@ -207,11 +209,11 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
}
}
}
}
}
}
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
int
indexCachePut
(
void
*
cache
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
if
(
cache
==
NULL
)
{
if
(
cache
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
IndexCache
*
pCache
=
cache
;
IndexCache
*
pCache
=
cache
;
indexCacheRef
(
pCache
);
indexCacheRef
(
pCache
);
...
@@ -222,8 +224,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
...
@@ -222,8 +224,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
}
}
// set up key
// set up key
ct
->
colType
=
term
->
colType
;
ct
->
colType
=
term
->
colType
;
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
term
->
nColVal
+
1
));
if
(
hasJson
)
{
memcpy
(
ct
->
colVal
,
term
->
colVal
,
term
->
nColVal
);
ct
->
colVal
=
indexPackJsonData
(
term
);
}
else
{
ct
->
colVal
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
(
term
->
nColVal
+
1
));
memcpy
(
ct
->
colVal
,
term
->
colVal
,
term
->
nColVal
);
}
ct
->
version
=
atomic_add_fetch_32
(
&
pCache
->
version
,
1
);
ct
->
version
=
atomic_add_fetch_32
(
&
pCache
->
version
,
1
);
// set value
// set value
ct
->
uid
=
uid
;
ct
->
uid
=
uid
;
...
@@ -294,13 +300,22 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
...
@@ -294,13 +300,22 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm
*
term
=
query
->
term
;
SIndexTerm
*
term
=
query
->
term
;
EIndexQueryType
qtype
=
query
->
qType
;
EIndexQueryType
qtype
=
query
->
qType
;
CacheTerm
ct
=
{.
colVal
=
term
->
colVal
,
.
version
=
atomic_load_32
(
&
pCache
->
version
)};
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
char
*
p
=
term
->
colVal
;
if
(
hasJson
)
{
p
=
indexPackJsonData
(
term
);
}
CacheTerm
ct
=
{.
colVal
=
p
,
.
version
=
atomic_load_32
(
&
pCache
->
version
)};
int
ret
=
indexQueryMem
(
mem
,
&
ct
,
qtype
,
result
,
s
);
int
ret
=
indexQueryMem
(
mem
,
&
ct
,
qtype
,
result
,
s
);
if
(
ret
==
0
&&
*
s
!=
kTypeDeletion
)
{
if
(
ret
==
0
&&
*
s
!=
kTypeDeletion
)
{
// continue search in imm
// continue search in imm
ret
=
indexQueryMem
(
imm
,
&
ct
,
qtype
,
result
,
s
);
ret
=
indexQueryMem
(
imm
,
&
ct
,
qtype
,
result
,
s
);
}
}
if
(
hasJson
)
{
tfree
(
p
);
}
indexMemUnRef
(
mem
);
indexMemUnRef
(
mem
);
indexMemUnRef
(
imm
);
indexMemUnRef
(
imm
);
...
@@ -367,6 +382,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
...
@@ -367,6 +382,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
}
}
static
MemTable
*
indexInternalCacheCreate
(
int8_t
type
)
{
static
MemTable
*
indexInternalCacheCreate
(
int8_t
type
)
{
type
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
type
,
TSDB_DATA_TYPE_JSON
)
?
TSDB_DATA_TYPE_BINARY
:
type
;
MemTable
*
tbl
=
calloc
(
1
,
sizeof
(
MemTable
));
MemTable
*
tbl
=
calloc
(
1
,
sizeof
(
MemTable
));
indexMemRef
(
tbl
);
indexMemRef
(
tbl
);
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
...
@@ -389,9 +406,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
...
@@ -389,9 +406,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
IterateValue
*
iv
=
&
itera
->
val
;
IterateValue
*
iv
=
&
itera
->
val
;
iterateValueDestroy
(
iv
,
false
);
iterateValueDestroy
(
iv
,
false
);
// IterateValue* iv = &itera->val;
// IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
bool
next
=
tSkipListIterNext
(
iter
);
bool
next
=
tSkipListIterNext
(
iter
);
if
(
next
)
{
if
(
next
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
...
@@ -411,10 +425,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
...
@@ -411,10 +425,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
taosArrayPush
(
iv
->
val
,
&
ct
->
uid
);
taosArrayPush
(
iv
->
val
,
&
ct
->
uid
);
}
}
// IterateValue* iv = &itera->val;
// iterateValueDestroy(iv, true);
//*iv = tIterVal;
return
next
;
return
next
;
}
}
...
...
source/libs/index/src/index_comm.c
0 → 100644
浏览文件 @
f7239014
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index.h"
#include "indexInt.h"
char
JSON_COLUMN
[]
=
"JSON"
;
char
JSON_VALUE_DELIM
=
'&'
;
char
*
indexPackJsonData
(
SIndexTerm
*
itm
)
{
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t
ty
=
INDEX_TYPE_GET_TYPE
(
itm
->
colType
);
int32_t
sz
=
itm
->
nColName
+
itm
->
nColVal
+
sizeof
(
uint8_t
)
+
sizeof
(
JSON_VALUE_DELIM
)
*
2
+
1
;
char
*
buf
=
(
char
*
)
calloc
(
1
,
sz
);
char
*
p
=
buf
;
memcpy
(
p
,
itm
->
colName
,
itm
->
nColName
);
p
+=
itm
->
nColName
;
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
&
ty
,
sizeof
(
ty
));
p
+=
sizeof
(
ty
);
memcpy
(
p
,
&
JSON_VALUE_DELIM
,
sizeof
(
JSON_VALUE_DELIM
));
p
+=
sizeof
(
JSON_VALUE_DELIM
);
memcpy
(
p
,
itm
->
colVal
,
itm
->
nColVal
);
return
buf
;
}
source/libs/index/src/index_json.c
0 → 100644
浏览文件 @
f7239014
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index.h"
#include "indexInt.h"
int
tIndexJsonOpen
(
SIndexJsonOpts
*
opts
,
const
char
*
path
,
SIndexJson
**
index
)
{
// handle
return
indexOpen
(
opts
,
path
,
index
);
}
int
tIndexJsonPut
(
SIndexJson
*
index
,
SIndexJsonMultiTerm
*
terms
,
uint64_t
uid
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
terms
);
i
++
)
{
SIndexJsonTerm
*
p
=
taosArrayGetP
(
terms
,
i
);
INDEX_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
}
return
indexPut
(
index
,
terms
,
uid
);
// handle put
}
int
tIndexJsonSearch
(
SIndexJson
*
index
,
SIndexJsonMultiTermQuery
*
tq
,
SArray
*
result
)
{
SArray
*
terms
=
tq
->
query
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
terms
);
i
++
)
{
SIndexJsonTerm
*
p
=
taosArrayGetP
(
terms
,
i
);
INDEX_TYPE_ADD_EXTERN_TYPE
(
p
->
colType
,
TSDB_DATA_TYPE_JSON
);
}
return
indexSearch
(
index
,
tq
,
result
);
// handle search
}
void
tIndexJsonClose
(
SIndexJson
*
index
)
{
return
indexClose
(
index
);
// handle close
}
source/libs/index/src/index_tfile.c
浏览文件 @
f7239014
...
@@ -15,6 +15,7 @@ p *
...
@@ -15,6 +15,7 @@ p *
#include "index_tfile.h"
#include "index_tfile.h"
#include "index.h"
#include "index.h"
#include "index_comm.h"
#include "index_fst.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_counting_writer.h"
#include "index_util.h"
#include "index_util.h"
...
@@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) {
...
@@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) {
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
int
tfileReaderSearch
(
TFileReader
*
reader
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
SIndexTerm
*
term
=
query
->
term
;
SIndexTerm
*
term
=
query
->
term
;
bool
hasJson
=
INDEX_TYPE_CONTAIN_EXTERN_TYPE
(
term
->
colType
,
TSDB_DATA_TYPE_JSON
);
EIndexQueryType
qtype
=
query
->
qType
;
EIndexQueryType
qtype
=
query
->
qType
;
int
ret
=
-
1
;
int
ret
=
-
1
;
// refactor to callback later
// refactor to callback later
if
(
qtype
==
QUERY_TERM
)
{
if
(
qtype
==
QUERY_TERM
)
{
uint64_t
offset
;
uint64_t
offset
;
FstSlice
key
=
fstSliceCreate
(
term
->
colVal
,
term
->
nColVal
);
char
*
p
=
term
->
colVal
;
uint64_t
sz
=
term
->
nColVal
;
if
(
hasJson
)
{
p
=
indexPackJsonData
(
term
);
sz
=
strlen
(
p
);
}
FstSlice
key
=
fstSliceCreate
(
p
,
sz
);
if
(
fstGet
(
reader
->
fst
,
&
key
,
&
offset
))
{
if
(
fstGet
(
reader
->
fst
,
&
key
,
&
offset
))
{
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, found table info in tindex"
,
term
->
suid
,
term
->
colName
,
indexInfo
(
"index: %"
PRIu64
", col: %s, colVal: %s, found table info in tindex"
,
term
->
suid
,
term
->
colName
,
term
->
colVal
);
term
->
colVal
);
...
@@ -202,10 +210,17 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
...
@@ -202,10 +210,17 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
term
->
colVal
);
term
->
colVal
);
}
}
fstSliceDestroy
(
&
key
);
fstSliceDestroy
(
&
key
);
if
(
hasJson
)
{
free
(
p
);
}
}
else
if
(
qtype
==
QUERY_PREFIX
)
{
}
else
if
(
qtype
==
QUERY_PREFIX
)
{
// handle later
// handle later
//
//
}
else
{
}
else
if
(
qtype
==
QUERY_SUFFIX
)
{
// handle later
}
else
if
(
qtype
==
QUERY_REGEX
)
{
// handle later
}
else
if
(
qtype
==
QUERY_RANGE
)
{
// handle later
// handle later
}
}
tfileReaderUnRef
(
reader
);
tfileReaderUnRef
(
reader
);
...
@@ -260,6 +275,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
...
@@ -260,6 +275,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
__compar_fn_t
fn
;
__compar_fn_t
fn
;
int8_t
colType
=
tw
->
header
.
colType
;
int8_t
colType
=
tw
->
header
.
colType
;
colType
=
INDEX_TYPE_GET_TYPE
(
colType
);
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
fn
=
tfileStrCompare
;
fn
=
tfileStrCompare
;
}
else
{
}
else
{
...
@@ -557,6 +573,8 @@ static int tfileWriteHeader(TFileWriter* writer) {
...
@@ -557,6 +573,8 @@ static int tfileWriteHeader(TFileWriter* writer) {
static
int
tfileWriteData
(
TFileWriter
*
write
,
TFileValue
*
tval
)
{
static
int
tfileWriteData
(
TFileWriter
*
write
,
TFileValue
*
tval
)
{
TFileHeader
*
header
=
&
write
->
header
;
TFileHeader
*
header
=
&
write
->
header
;
uint8_t
colType
=
header
->
colType
;
uint8_t
colType
=
header
->
colType
;
colType
=
INDEX_TYPE_GET_TYPE
(
colType
);
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
colType
==
TSDB_DATA_TYPE_BINARY
||
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)(
tval
->
colVal
),
(
size_t
)
strlen
(
tval
->
colVal
));
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)(
tval
->
colVal
),
(
size_t
)
strlen
(
tval
->
colVal
));
if
(
fstBuilderInsert
(
write
->
fb
,
key
,
tval
->
offset
))
{
if
(
fstBuilderInsert
(
write
->
fb
,
key
,
tval
->
offset
))
{
...
@@ -586,11 +604,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
...
@@ -586,11 +604,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t
nread
=
reader
->
ctx
->
readFrom
(
reader
->
ctx
,
buf
,
sizeof
(
buf
),
0
);
int64_t
nread
=
reader
->
ctx
->
readFrom
(
reader
->
ctx
,
buf
,
sizeof
(
buf
),
0
);
if
(
nread
==
-
1
)
{
if
(
nread
==
-
1
)
{
indexError
(
"actual Read: %d, to read: %d, errno: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
indexError
(
"actual Read: %d, to read: %d, errno: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
errno
,
errno
,
reader
->
ctx
->
file
.
buf
);
reader
->
ctx
->
file
.
buf
);
}
else
{
}
else
{
indexInfo
(
"actual Read: %d, to read: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
indexInfo
(
"actual Read: %d, to read: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
reader
->
ctx
->
file
.
buf
);
reader
->
ctx
->
file
.
buf
);
}
}
// assert(nread == sizeof(buf));
// assert(nread == sizeof(buf));
memcpy
(
&
reader
->
header
,
buf
,
sizeof
(
buf
));
memcpy
(
&
reader
->
header
,
buf
,
sizeof
(
buf
));
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
f7239014
...
@@ -2,6 +2,7 @@ add_executable(indexTest "")
...
@@ -2,6 +2,7 @@ add_executable(indexTest "")
add_executable
(
fstTest
""
)
add_executable
(
fstTest
""
)
add_executable
(
fstUT
""
)
add_executable
(
fstUT
""
)
add_executable
(
UtilUT
""
)
add_executable
(
UtilUT
""
)
add_executable
(
jsonUT
""
)
target_sources
(
indexTest
target_sources
(
indexTest
PRIVATE
PRIVATE
...
@@ -21,6 +22,10 @@ target_sources(UtilUT
...
@@ -21,6 +22,10 @@ target_sources(UtilUT
"utilUT.cc"
"utilUT.cc"
)
)
target_sources
(
jsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories
(
indexTest
target_include_directories
(
indexTest
PUBLIC
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
...
@@ -43,6 +48,12 @@ target_include_directories ( UtilUT
...
@@ -43,6 +48,12 @@ target_include_directories ( UtilUT
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
)
target_include_directories
(
jsonUT
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
indexTest
target_link_libraries
(
indexTest
os
os
util
util
...
@@ -73,6 +84,13 @@ target_link_libraries (UtilUT
...
@@ -73,6 +84,13 @@ target_link_libraries (UtilUT
index
index
)
)
target_link_libraries
(
jsonUT
os
util
common
gtest_main
index
)
#add_test(
#add_test(
# NAME index_test
# NAME index_test
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
f7239014
...
@@ -301,13 +301,18 @@ void validateTFile(char* arg) {
...
@@ -301,13 +301,18 @@ void validateTFile(char* arg) {
}
}
}
}
void
iterTFileReader
(
char
*
path
,
char
*
ver
)
{
void
iterTFileReader
(
char
*
path
,
char
*
uid
,
char
*
colName
,
char
*
ver
)
{
int
version
=
atoi
(
ver
);
// tfInit();
TFileReader
*
reader
=
tfileReaderOpen
(
path
,
0
,
version
,
"tag1"
);
Iterate
*
iter
=
tfileIteratorCreate
(
reader
);
uint64_t
suid
=
atoi
(
uid
);
bool
tn
=
iter
?
iter
->
next
(
iter
)
:
false
;
int
version
=
atoi
(
ver
);
int
count
=
0
;
int
termCount
=
0
;
TFileReader
*
reader
=
tfileReaderOpen
(
path
,
suid
,
version
,
colName
);
Iterate
*
iter
=
tfileIteratorCreate
(
reader
);
bool
tn
=
iter
?
iter
->
next
(
iter
)
:
false
;
int
count
=
0
;
int
termCount
=
0
;
while
(
tn
==
true
)
{
while
(
tn
==
true
)
{
count
++
;
count
++
;
IterateValue
*
cv
=
iter
->
getValue
(
iter
);
IterateValue
*
cv
=
iter
->
getValue
(
iter
);
...
@@ -323,9 +328,9 @@ void iterTFileReader(char* path, char* ver) {
...
@@ -323,9 +328,9 @@ void iterTFileReader(char* path, char* ver) {
int
main
(
int
argc
,
char
*
argv
[])
{
int
main
(
int
argc
,
char
*
argv
[])
{
// tool to check all kind of fst test
// tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); }
// if (argc > 1) { validateTFile(argv[1]); }
if
(
argc
>
2
)
{
if
(
argc
>
4
)
{
//
opt
//
path suid colName ver
iterTFileReader
(
argv
[
1
],
argv
[
2
]);
iterTFileReader
(
argv
[
1
],
argv
[
2
]
,
argv
[
3
],
argv
[
4
]
);
}
}
// checkFstCheckIterator();
// checkFstCheckIterator();
// checkFstLongTerm();
// checkFstLongTerm();
...
...
source/libs/index/test/fstUT.cc
浏览文件 @
f7239014
...
@@ -213,21 +213,21 @@ class FstEnv : public ::testing::Test {
...
@@ -213,21 +213,21 @@ class FstEnv : public ::testing::Test {
TEST_F
(
FstEnv
,
writeNormal
)
{
TEST_F
(
FstEnv
,
writeNormal
)
{
fst
->
CreateWriter
();
fst
->
CreateWriter
();
std
::
string
str
(
"
aa
"
);
std
::
string
str
(
"
11
"
);
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
str
[
0
]
=
'
a
'
+
i
;
str
[
0
]
=
'
1
'
+
i
;
str
.
resize
(
2
);
str
.
resize
(
2
);
assert
(
fst
->
Put
(
str
,
i
)
==
true
);
assert
(
fst
->
Put
(
str
,
i
)
==
true
);
}
}
// order failed
// order failed
assert
(
fst
->
Put
(
"
aa
"
,
1
)
==
false
);
assert
(
fst
->
Put
(
"
11
"
,
1
)
==
false
);
fst
->
DestroyWriter
();
fst
->
DestroyWriter
();
fst
->
CreateReader
();
fst
->
CreateReader
();
uint64_t
val
;
uint64_t
val
;
assert
(
fst
->
Get
(
"
a
"
,
&
val
)
==
false
);
assert
(
fst
->
Get
(
"
1
"
,
&
val
)
==
false
);
assert
(
fst
->
Get
(
"
aa
"
,
&
val
)
==
true
);
assert
(
fst
->
Get
(
"
11
"
,
&
val
)
==
true
);
assert
(
val
==
0
);
assert
(
val
==
0
);
std
::
vector
<
uint64_t
>
rlt
;
std
::
vector
<
uint64_t
>
rlt
;
...
@@ -235,3 +235,19 @@ TEST_F(FstEnv, writeNormal) {
...
@@ -235,3 +235,19 @@ TEST_F(FstEnv, writeNormal) {
assert
(
fst
->
Search
(
ctx
,
rlt
)
==
true
);
assert
(
fst
->
Search
(
ctx
,
rlt
)
==
true
);
}
}
TEST_F
(
FstEnv
,
WriteMillonrRecord
)
{}
TEST_F
(
FstEnv
,
WriteMillonrRecord
)
{}
TEST_F
(
FstEnv
,
writeAbNormal
)
{
fst
->
CreateWriter
();
std
::
string
str1
(
"voltage&
\b
&ab"
);
std
::
string
str2
(
"voltbge&
\b
&ab"
);
fst
->
Put
(
str1
,
1
);
fst
->
Put
(
str2
,
2
);
fst
->
DestroyWriter
();
fst
->
CreateReader
();
uint64_t
val
;
assert
(
fst
->
Get
(
"1"
,
&
val
)
==
false
);
assert
(
fst
->
Get
(
"voltage&
\b
&ab"
,
&
val
)
==
true
);
assert
(
val
==
1
);
}
source/libs/index/test/jsonUT.cc
0 → 100644
浏览文件 @
f7239014
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
static
std
::
string
dir
=
"/tmp/json"
;
class
JsonEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
taosRemoveDir
(
dir
.
c_str
());
taosMkDir
(
dir
.
c_str
());
printf
(
"set up
\n
"
);
opts
=
indexOptsCreate
();
int
ret
=
tIndexJsonOpen
(
opts
,
dir
.
c_str
(),
&
index
);
assert
(
ret
==
0
);
}
virtual
void
TearDown
()
{
tIndexJsonClose
(
index
);
indexOptsDestroy
(
opts
);
printf
(
"destory
\n
"
);
}
SIndexJsonOpts
*
opts
;
SIndexJson
*
index
;
};
TEST_F
(
JsonEnv
,
testWrite
)
{
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"ab"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"voltage"
);
std
::
string
colVal
(
"ab1"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"voltage"
);
std
::
string
colVal
(
"123"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"ab"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
assert
(
100
==
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
TEST_F
(
JsonEnv
,
testWriteMillonData
)
{
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"ab"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"voltagefdadfa"
);
std
::
string
colVal
(
"abxxxxxxxxxxxx"
);
SIndexTerm
*
term
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
1000000
;
i
++
)
{
tIndexJsonPut
(
index
,
terms
,
i
);
}
indexMultiTermDestroy
(
terms
);
}
{
std
::
string
colName
(
"test"
);
std
::
string
colVal
(
"ab"
);
SIndexMultiTermQuery
*
mq
=
indexMultiTermQueryCreate
(
MUST
);
SIndexTerm
*
q
=
indexTermCreate
(
1
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SArray
*
result
=
taosArrayInit
(
1
,
sizeof
(
uint64_t
));
indexMultiTermQueryAdd
(
mq
,
q
,
QUERY_TERM
);
tIndexJsonSearch
(
index
,
mq
,
result
);
assert
(
100
==
taosArrayGetSize
(
result
));
indexMultiTermQueryDestroy
(
mq
);
}
}
source/libs/monitor/src/monitor.c
浏览文件 @
f7239014
...
@@ -23,7 +23,7 @@
...
@@ -23,7 +23,7 @@
static
SMonitor
tsMonitor
=
{
0
};
static
SMonitor
tsMonitor
=
{
0
};
int32_t
monInit
(
const
SMonCfg
*
pCfg
)
{
int32_t
monInit
(
const
SMonCfg
*
pCfg
)
{
tsMonitor
.
logs
=
taosArrayInit
(
16
,
sizeof
(
SMon
Info
));
tsMonitor
.
logs
=
taosArrayInit
(
16
,
sizeof
(
SMon
LogItem
));
if
(
tsMonitor
.
logs
==
NULL
)
{
if
(
tsMonitor
.
logs
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -44,7 +44,7 @@ void monCleanup() {
...
@@ -44,7 +44,7 @@ void monCleanup() {
void
monAddLogItem
(
SMonLogItem
*
pItem
)
{
void
monAddLogItem
(
SMonLogItem
*
pItem
)
{
taosWLockLatch
(
&
tsMonitor
.
lock
);
taosWLockLatch
(
&
tsMonitor
.
lock
);
int32_t
size
=
taosArrayGetSize
(
tsMonitor
.
logs
);
int32_t
size
=
taosArrayGetSize
(
tsMonitor
.
logs
);
if
(
size
>
tsMonitor
.
maxLogs
)
{
if
(
size
>
=
tsMonitor
.
maxLogs
)
{
uInfo
(
"too many logs for monitor"
);
uInfo
(
"too many logs for monitor"
);
}
else
{
}
else
{
taosArrayPush
(
tsMonitor
.
logs
,
pItem
);
taosArrayPush
(
tsMonitor
.
logs
,
pItem
);
...
@@ -54,7 +54,10 @@ void monAddLogItem(SMonLogItem *pItem) {
...
@@ -54,7 +54,10 @@ void monAddLogItem(SMonLogItem *pItem) {
SMonInfo
*
monCreateMonitorInfo
()
{
SMonInfo
*
monCreateMonitorInfo
()
{
SMonInfo
*
pMonitor
=
calloc
(
1
,
sizeof
(
SMonInfo
));
SMonInfo
*
pMonitor
=
calloc
(
1
,
sizeof
(
SMonInfo
));
if
(
pMonitor
==
NULL
)
return
NULL
;
if
(
pMonitor
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosWLockLatch
(
&
tsMonitor
.
lock
);
taosWLockLatch
(
&
tsMonitor
.
lock
);
pMonitor
->
logs
=
taosArrayDup
(
tsMonitor
.
logs
);
pMonitor
->
logs
=
taosArrayDup
(
tsMonitor
.
logs
);
...
@@ -77,41 +80,257 @@ void monCleanupMonitorInfo(SMonInfo *pMonitor) {
...
@@ -77,41 +80,257 @@ void monCleanupMonitorInfo(SMonInfo *pMonitor) {
free
(
pMonitor
);
free
(
pMonitor
);
}
}
void
monSendReport
(
SMonInfo
*
pMonitor
)
{
char
*
pCont
=
tjsonToString
(
pMonitor
->
pJson
);
if
(
pCont
!=
NULL
)
{
taosSendHttpReport
(
tsMonitor
.
server
,
tsMonitor
.
port
,
pCont
,
strlen
(
pCont
));
free
(
pCont
);
}
}
void
monSetBasicInfo
(
SMonInfo
*
pMonitor
,
SMonBasicInfo
*
pInfo
)
{
void
monSetBasicInfo
(
SMonInfo
*
pMonitor
,
SMonBasicInfo
*
pInfo
)
{
SJson
*
pJson
=
pMonitor
->
pJson
;
SJson
*
pJson
=
pMonitor
->
pJson
;
tjsonAddDoubleToObject
(
pJson
,
"dnode_id"
,
pInfo
->
dnode_id
);
tjsonAddStringToObject
(
pJson
,
"dnode_ep"
,
pInfo
->
dnode_ep
);
int64_t
ms
=
taosGetTimestampMs
();
int64_t
ms
=
taosGetTimestampMs
();
char
buf
[
40
]
=
{
0
};
char
buf
[
40
]
=
{
0
};
taosFormatUtcTime
(
buf
,
sizeof
(
buf
),
ms
,
TSDB_TIME_PRECISION_MILLI
);
taosFormatUtcTime
(
buf
,
sizeof
(
buf
),
ms
,
TSDB_TIME_PRECISION_MILLI
);
tjsonAddStringToObject
(
pJson
,
"ts"
,
buf
);
tjsonAddStringToObject
(
pJson
,
"ts"
,
buf
);
tjsonAddDoubleToObject
(
pJson
,
"dnode_id"
,
pInfo
->
dnode_id
);
tjsonAddStringToObject
(
pJson
,
"dnode_ep"
,
pInfo
->
dnode_ep
);
}
}
void
monSetClusterInfo
(
SMonInfo
*
pMonitor
,
SMonClusterInfo
*
pInfo
)
{
void
monSetClusterInfo
(
SMonInfo
*
pMonitor
,
SMonClusterInfo
*
pInfo
)
{
SJson
*
pJson
=
tjsonCreateObject
();
if
(
pJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pMonitor
->
pJson
,
"cluster_info"
,
pJson
)
!=
0
)
{
tjsonDelete
(
pJson
);
return
;
}
tjsonAddStringToObject
(
pJson
,
"first_ep"
,
pInfo
->
first_ep
);
tjsonAddDoubleToObject
(
pJson
,
"first_ep_dnode_id"
,
pInfo
->
first_ep_dnode_id
);
tjsonAddStringToObject
(
pJson
,
"version"
,
pInfo
->
version
);
tjsonAddDoubleToObject
(
pJson
,
"master_uptime"
,
pInfo
->
master_uptime
);
tjsonAddDoubleToObject
(
pJson
,
"monitor_interval"
,
pInfo
->
monitor_interval
);
tjsonAddDoubleToObject
(
pJson
,
"vgroups_total"
,
pInfo
->
vgroups_total
);
tjsonAddDoubleToObject
(
pJson
,
"vgroups_alive"
,
pInfo
->
vgroups_alive
);
tjsonAddDoubleToObject
(
pJson
,
"vnodes_total"
,
pInfo
->
vnodes_total
);
tjsonAddDoubleToObject
(
pJson
,
"vnodes_alive"
,
pInfo
->
vnodes_alive
);
tjsonAddDoubleToObject
(
pJson
,
"connections_total"
,
pInfo
->
connections_total
);
SJson
*
pDnodesJson
=
tjsonAddArrayToObject
(
pJson
,
"dnodes"
);
if
(
pDnodesJson
==
NULL
)
return
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
dnodes
);
++
i
)
{
SJson
*
pDnodeJson
=
tjsonCreateObject
();
if
(
pDnodeJson
==
NULL
)
continue
;
SMonDnodeDesc
*
pDnodeDesc
=
taosArrayGet
(
pInfo
->
dnodes
,
i
);
tjsonAddDoubleToObject
(
pDnodeJson
,
"dnode_id"
,
pDnodeDesc
->
dnode_id
);
tjsonAddStringToObject
(
pDnodeJson
,
"dnode_ep"
,
pDnodeDesc
->
dnode_ep
);
tjsonAddStringToObject
(
pDnodeJson
,
"status"
,
pDnodeDesc
->
status
);
if
(
tjsonAddItemToArray
(
pDnodesJson
,
pDnodeJson
)
!=
0
)
tjsonDelete
(
pDnodeJson
);
}
SJson
*
pMnodesJson
=
tjsonAddArrayToObject
(
pJson
,
"mnodes"
);
if
(
pMnodesJson
==
NULL
)
return
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
dnodes
);
++
i
)
{
SJson
*
pMnodeJson
=
tjsonCreateObject
();
if
(
pMnodeJson
==
NULL
)
continue
;
SMonMnodeDesc
*
pMnodeDesc
=
taosArrayGet
(
pInfo
->
dnodes
,
i
);
tjsonAddDoubleToObject
(
pMnodeJson
,
"mnode_id"
,
pMnodeDesc
->
mnode_id
);
tjsonAddStringToObject
(
pMnodeJson
,
"mnode_ep"
,
pMnodeDesc
->
mnode_ep
);
tjsonAddStringToObject
(
pMnodeJson
,
"role"
,
pMnodeDesc
->
role
);
if
(
tjsonAddItemToArray
(
pMnodesJson
,
pMnodeJson
)
!=
0
)
tjsonDelete
(
pMnodeJson
);
}
}
}
void
monSetVgroupInfo
(
SMonInfo
*
pMonitor
,
SMonVgroupInfo
*
pInfo
)
{
void
monSetVgroupInfo
(
SMonInfo
*
pMonitor
,
SMonVgroupInfo
*
pInfo
)
{
SJson
*
pJson
=
tjsonAddArrayToObject
(
pMonitor
->
pJson
,
"vgroup_infos"
);
if
(
pJson
==
NULL
)
return
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
vgroups
);
++
i
)
{
SJson
*
pVgroupJson
=
tjsonCreateObject
();
if
(
pVgroupJson
==
NULL
)
continue
;
if
(
tjsonAddItemToArray
(
pJson
,
pVgroupJson
)
!=
0
)
{
tjsonDelete
(
pVgroupJson
);
continue
;
}
SMonVgroupDesc
*
pVgroupDesc
=
taosArrayGet
(
pInfo
->
vgroups
,
i
);
tjsonAddDoubleToObject
(
pVgroupJson
,
"vgroup_id"
,
pVgroupDesc
->
vgroup_id
);
tjsonAddStringToObject
(
pVgroupJson
,
"database_name"
,
pVgroupDesc
->
database_name
);
tjsonAddDoubleToObject
(
pVgroupJson
,
"tables_num"
,
pVgroupDesc
->
tables_num
);
tjsonAddStringToObject
(
pVgroupJson
,
"status"
,
pVgroupDesc
->
status
);
SJson
*
pVnodesJson
=
tjsonAddArrayToObject
(
pVgroupJson
,
"vnodes"
);
if
(
pVnodesJson
==
NULL
)
continue
;
for
(
int32_t
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
++
j
)
{
SMonVnodeDesc
*
pVnodeDesc
=
&
pVgroupDesc
->
vnodes
[
j
];
if
(
pVnodeDesc
->
dnode_id
<=
0
)
continue
;
SJson
*
pVnodeJson
=
tjsonCreateObject
();
if
(
pVnodeJson
==
NULL
)
continue
;
tjsonAddDoubleToObject
(
pVnodeJson
,
"dnode_id"
,
pVnodeDesc
->
dnode_id
);
tjsonAddStringToObject
(
pVnodeJson
,
"vnode_role"
,
pVnodeDesc
->
vnode_role
);
if
(
tjsonAddItemToArray
(
pVnodesJson
,
pVnodeJson
)
!=
0
)
tjsonDelete
(
pVnodeJson
);
}
}
}
}
void
monSetGrantInfo
(
SMonInfo
*
pMonitor
,
SMonGrantInfo
*
pInfo
)
{
void
monSetGrantInfo
(
SMonInfo
*
pMonitor
,
SMonGrantInfo
*
pInfo
)
{
SJson
*
pJson
=
tjsonCreateObject
();
if
(
pJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pMonitor
->
pJson
,
"grant_info"
,
pJson
)
!=
0
)
{
tjsonDelete
(
pJson
);
return
;
}
tjsonAddDoubleToObject
(
pJson
,
"expire_time"
,
pInfo
->
expire_time
);
tjsonAddDoubleToObject
(
pJson
,
"timeseries_used"
,
pInfo
->
timeseries_used
);
tjsonAddDoubleToObject
(
pJson
,
"timeseries_total"
,
pInfo
->
timeseries_total
);
}
}
void
monSetDnodeInfo
(
SMonInfo
*
pMonitor
,
SMonDnodeInfo
*
pInfo
)
{
void
monSetDnodeInfo
(
SMonInfo
*
pMonitor
,
SMonDnodeInfo
*
pInfo
)
{
SJson
*
pJson
=
tjsonCreateObject
();
if
(
pJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pMonitor
->
pJson
,
"dnode_info"
,
pJson
)
!=
0
)
{
tjsonDelete
(
pJson
);
return
;
}
tjsonAddDoubleToObject
(
pJson
,
"uptime"
,
pInfo
->
uptime
);
tjsonAddDoubleToObject
(
pJson
,
"cpu_engine"
,
pInfo
->
cpu_engine
);
tjsonAddDoubleToObject
(
pJson
,
"cpu_system"
,
pInfo
->
cpu_system
);
tjsonAddDoubleToObject
(
pJson
,
"cpu_cores"
,
pInfo
->
cpu_cores
);
tjsonAddDoubleToObject
(
pJson
,
"mem_engine"
,
pInfo
->
mem_engine
);
tjsonAddDoubleToObject
(
pJson
,
"mem_system"
,
pInfo
->
mem_system
);
tjsonAddDoubleToObject
(
pJson
,
"mem_total"
,
pInfo
->
mem_total
);
tjsonAddDoubleToObject
(
pJson
,
"disk_engine"
,
pInfo
->
disk_engine
);
tjsonAddDoubleToObject
(
pJson
,
"disk_used"
,
pInfo
->
disk_used
);
tjsonAddDoubleToObject
(
pJson
,
"disk_total"
,
pInfo
->
disk_total
);
tjsonAddDoubleToObject
(
pJson
,
"net_in"
,
pInfo
->
net_in
);
tjsonAddDoubleToObject
(
pJson
,
"net_out"
,
pInfo
->
net_out
);
tjsonAddDoubleToObject
(
pJson
,
"io_read"
,
pInfo
->
io_read
);
tjsonAddDoubleToObject
(
pJson
,
"io_write"
,
pInfo
->
io_write
);
tjsonAddDoubleToObject
(
pJson
,
"io_read_disk"
,
pInfo
->
io_read_disk
);
tjsonAddDoubleToObject
(
pJson
,
"io_write_disk"
,
pInfo
->
io_write_disk
);
tjsonAddDoubleToObject
(
pJson
,
"req_select"
,
pInfo
->
req_select
);
tjsonAddDoubleToObject
(
pJson
,
"req_select_rate"
,
pInfo
->
req_select_rate
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert"
,
pInfo
->
req_insert
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert_success"
,
pInfo
->
req_insert_success
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert_rate"
,
pInfo
->
req_insert_rate
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert_batch"
,
pInfo
->
req_insert_batch
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert_batch_success"
,
pInfo
->
req_insert_batch_success
);
tjsonAddDoubleToObject
(
pJson
,
"req_insert_batch_rate"
,
pInfo
->
req_insert_batch_rate
);
tjsonAddDoubleToObject
(
pJson
,
"errors"
,
pInfo
->
errors
);
tjsonAddDoubleToObject
(
pJson
,
"vnodes_num"
,
pInfo
->
vnodes_num
);
tjsonAddDoubleToObject
(
pJson
,
"masters"
,
pInfo
->
masters
);
tjsonAddDoubleToObject
(
pJson
,
"has_mnode"
,
pInfo
->
has_mnode
);
}
}
void
monSetDiskInfo
(
SMonInfo
*
pMonitor
,
SMonDiskInfo
*
pInfo
)
{
void
monSetDiskInfo
(
SMonInfo
*
pMonitor
,
SMonDiskInfo
*
pInfo
)
{
SJson
*
pJson
=
tjsonCreateObject
();
if
(
pJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pMonitor
->
pJson
,
"disk_infos"
,
pJson
)
!=
0
)
{
tjsonDelete
(
pJson
);
return
;
}
SJson
*
pDatadirsJson
=
tjsonAddArrayToObject
(
pJson
,
"datadir"
);
if
(
pDatadirsJson
==
NULL
)
return
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
datadirs
);
++
i
)
{
SJson
*
pDatadirJson
=
tjsonCreateObject
();
if
(
pDatadirJson
==
NULL
)
continue
;
SMonDiskDesc
*
pDatadirDesc
=
taosArrayGet
(
pInfo
->
datadirs
,
i
);
if
(
tjsonAddStringToObject
(
pDatadirJson
,
"name"
,
pDatadirDesc
->
name
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
if
(
tjsonAddDoubleToObject
(
pDatadirJson
,
"level"
,
pDatadirDesc
->
level
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
if
(
tjsonAddDoubleToObject
(
pDatadirJson
,
"avail"
,
pDatadirDesc
->
size
.
avail
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
if
(
tjsonAddDoubleToObject
(
pDatadirJson
,
"used"
,
pDatadirDesc
->
size
.
used
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
if
(
tjsonAddDoubleToObject
(
pDatadirJson
,
"total"
,
pDatadirDesc
->
size
.
total
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
if
(
tjsonAddItemToArray
(
pDatadirsJson
,
pDatadirJson
)
!=
0
)
tjsonDelete
(
pDatadirJson
);
}
SJson
*
pLogdirJson
=
tjsonCreateObject
();
if
(
pLogdirJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pJson
,
"logdir"
,
pLogdirJson
)
!=
0
)
return
;
tjsonAddStringToObject
(
pLogdirJson
,
"name"
,
pInfo
->
logdir
.
name
);
tjsonAddDoubleToObject
(
pLogdirJson
,
"avail"
,
pInfo
->
logdir
.
size
.
avail
);
tjsonAddDoubleToObject
(
pLogdirJson
,
"used"
,
pInfo
->
logdir
.
size
.
used
);
tjsonAddDoubleToObject
(
pLogdirJson
,
"total"
,
pInfo
->
logdir
.
size
.
total
);
SJson
*
pTempdirJson
=
tjsonCreateObject
();
if
(
pTempdirJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pJson
,
"tempdir"
,
pTempdirJson
)
!=
0
)
return
;
tjsonAddStringToObject
(
pTempdirJson
,
"name"
,
pInfo
->
tempdir
.
name
);
tjsonAddDoubleToObject
(
pTempdirJson
,
"avail"
,
pInfo
->
tempdir
.
size
.
avail
);
tjsonAddDoubleToObject
(
pTempdirJson
,
"used"
,
pInfo
->
tempdir
.
size
.
used
);
tjsonAddDoubleToObject
(
pTempdirJson
,
"total"
,
pInfo
->
tempdir
.
size
.
total
);
}
static
void
monSetLogInfo
(
SMonInfo
*
pMonitor
)
{
SJson
*
pJson
=
tjsonCreateObject
();
if
(
pJson
==
NULL
)
return
;
if
(
tjsonAddItemToObject
(
pMonitor
->
pJson
,
"log_infos"
,
pJson
)
!=
0
)
{
tjsonDelete
(
pJson
);
return
;
}
SJson
*
pLogsJson
=
tjsonAddArrayToObject
(
pJson
,
"logs"
);
if
(
pLogsJson
==
NULL
)
return
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pMonitor
->
logs
);
++
i
)
{
SJson
*
pLogJson
=
tjsonCreateObject
();
if
(
pLogJson
==
NULL
)
continue
;
SMonLogItem
*
pLogItem
=
taosArrayGet
(
pMonitor
->
logs
,
i
);
char
buf
[
40
]
=
{
0
};
taosFormatUtcTime
(
buf
,
sizeof
(
buf
),
pLogItem
->
ts
,
TSDB_TIME_PRECISION_MILLI
);
tjsonAddStringToObject
(
pLogJson
,
"ts"
,
buf
);
tjsonAddDoubleToObject
(
pLogJson
,
"level"
,
pLogItem
->
level
);
tjsonAddStringToObject
(
pLogJson
,
"content"
,
pLogItem
->
content
);
if
(
tjsonAddItemToArray
(
pLogsJson
,
pLogJson
)
!=
0
)
tjsonDelete
(
pLogJson
);
}
SJson
*
pSummaryJson
=
tjsonAddArrayToObject
(
pJson
,
"summary"
);
if
(
pSummaryJson
==
NULL
)
return
;
SJson
*
pLogError
=
tjsonCreateObject
();
if
(
pLogError
==
NULL
)
return
;
tjsonAddStringToObject
(
pLogError
,
"level"
,
"error"
);
tjsonAddDoubleToObject
(
pLogError
,
"total"
,
1
);
if
(
tjsonAddItemToArray
(
pSummaryJson
,
pLogError
)
!=
0
)
tjsonDelete
(
pLogError
);
SJson
*
pLogInfo
=
tjsonCreateObject
();
if
(
pLogInfo
==
NULL
)
return
;
tjsonAddStringToObject
(
pLogInfo
,
"level"
,
"info"
);
tjsonAddDoubleToObject
(
pLogInfo
,
"total"
,
1
);
if
(
tjsonAddItemToArray
(
pSummaryJson
,
pLogInfo
)
!=
0
)
tjsonDelete
(
pLogInfo
);
SJson
*
pLogDebug
=
tjsonCreateObject
();
if
(
pLogDebug
==
NULL
)
return
;
tjsonAddStringToObject
(
pLogDebug
,
"level"
,
"debug"
);
tjsonAddDoubleToObject
(
pLogDebug
,
"total"
,
1
);
if
(
tjsonAddItemToArray
(
pSummaryJson
,
pLogDebug
)
!=
0
)
tjsonDelete
(
pLogDebug
);
SJson
*
pLogTrace
=
tjsonCreateObject
();
if
(
pLogTrace
==
NULL
)
return
;
tjsonAddStringToObject
(
pLogTrace
,
"level"
,
"trace"
);
tjsonAddDoubleToObject
(
pLogTrace
,
"total"
,
1
);
if
(
tjsonAddItemToArray
(
pSummaryJson
,
pLogTrace
)
!=
0
)
tjsonDelete
(
pLogTrace
);
}
void
monSendReport
(
SMonInfo
*
pMonitor
)
{
monSetLogInfo
(
pMonitor
);
char
*
pCont
=
tjsonToString
(
pMonitor
->
pJson
);
if
(
pCont
!=
NULL
)
{
taosSendHttpReport
(
tsMonitor
.
server
,
tsMonitor
.
port
,
pCont
,
strlen
(
pCont
));
free
(
pCont
);
}
}
}
source/libs/monitor/test/monTest.cpp
浏览文件 @
f7239014
...
@@ -13,21 +13,262 @@
...
@@ -13,21 +13,262 @@
#include "os.h"
#include "os.h"
#include "monitor.h"
#include "monitor.h"
#include "tglobal.h"
class
MonitorTest
:
public
::
testing
::
Test
{
class
MonitorTest
:
public
::
testing
::
Test
{
protected:
protected:
static
void
SetUpTestSuite
()
{
root
=
"/tmp/monTest"
;
}
static
void
SetUpTestSuite
()
{
static
void
TearDownTestSuite
()
{}
SMonCfg
cfg
;
cfg
.
maxLogs
=
2
;
cfg
.
port
=
80
;
cfg
.
server
=
"localhost"
;
monInit
(
&
cfg
);
}
static
void
TearDownTestSuite
()
{
monCleanup
();
}
public:
public:
void
SetUp
()
override
{}
void
SetUp
()
override
{}
void
TearDown
()
override
{}
void
TearDown
()
override
{}
static
const
char
*
root
;
void
GetBasicInfo
(
SMonInfo
*
pMonitor
,
SMonBasicInfo
*
pInfo
);
void
GetClusterInfo
(
SMonInfo
*
pMonitor
,
SMonClusterInfo
*
pInfo
);
void
GetVgroupInfo
(
SMonInfo
*
pMonitor
,
SMonVgroupInfo
*
pInfo
);
void
GetGrantInfo
(
SMonInfo
*
pMonitor
,
SMonGrantInfo
*
pInfo
);
void
GetDnodeInfo
(
SMonInfo
*
pMonitor
,
SMonDnodeInfo
*
pInfo
);
void
GetDiskInfo
(
SMonInfo
*
pMonitor
,
SMonDiskInfo
*
pInfo
);
void
AddLogInfo1
();
void
AddLogInfo2
();
};
};
const
char
*
MonitorTest
::
root
;
void
MonitorTest
::
GetBasicInfo
(
SMonInfo
*
pMonitor
,
SMonBasicInfo
*
pInfo
)
{
pInfo
->
dnode_id
=
1
;
strcpy
(
pInfo
->
dnode_ep
,
"localhost"
);
}
void
MonitorTest
::
GetClusterInfo
(
SMonInfo
*
pMonitor
,
SMonClusterInfo
*
pInfo
)
{
strcpy
(
pInfo
->
first_ep
,
"localhost:6030"
);
pInfo
->
first_ep_dnode_id
=
1
;
strcpy
(
pInfo
->
version
,
"3.0.0.0"
);
pInfo
->
master_uptime
=
1
;
pInfo
->
monitor_interval
=
2
;
pInfo
->
vgroups_total
=
3
;
pInfo
->
vgroups_alive
=
43
;
pInfo
->
vnodes_total
=
5
;
pInfo
->
vnodes_alive
=
6
;
pInfo
->
connections_total
=
7
;
pInfo
->
dnodes
=
taosArrayInit
(
4
,
sizeof
(
SMonDnodeDesc
));
SMonDnodeDesc
d1
=
{
0
};
d1
.
dnode_id
=
1
;
strcpy
(
d1
.
dnode_ep
,
"localhost:6030"
);
strcpy
(
d1
.
status
,
"ready"
);
taosArrayPush
(
pInfo
->
dnodes
,
&
d1
);
SMonDnodeDesc
d2
=
{
0
};
d2
.
dnode_id
=
2
;
strcpy
(
d2
.
dnode_ep
,
"localhost:7030"
);
strcpy
(
d2
.
status
,
"offline"
);
taosArrayPush
(
pInfo
->
dnodes
,
&
d2
);
pInfo
->
mnodes
=
taosArrayInit
(
4
,
sizeof
(
SMonMnodeDesc
));
SMonMnodeDesc
m1
=
{
0
};
m1
.
mnode_id
=
1
;
strcpy
(
m1
.
mnode_ep
,
"localhost:6030"
);
strcpy
(
m1
.
role
,
"master"
);
taosArrayPush
(
pInfo
->
mnodes
,
&
m1
);
SMonMnodeDesc
m2
=
{
0
};
m2
.
mnode_id
=
2
;
strcpy
(
m2
.
mnode_ep
,
"localhost:7030"
);
strcpy
(
m2
.
role
,
"unsynced"
);
taosArrayPush
(
pInfo
->
mnodes
,
&
m2
);
}
void
MonitorTest
::
GetVgroupInfo
(
SMonInfo
*
pMonitor
,
SMonVgroupInfo
*
pInfo
)
{
pInfo
->
vgroups
=
taosArrayInit
(
4
,
sizeof
(
SMonVgroupDesc
));
SMonVgroupDesc
vg1
=
{
0
};
vg1
.
vgroup_id
=
1
;
strcpy
(
vg1
.
database_name
,
"d1"
);
vg1
.
tables_num
=
4
;
strcpy
(
vg1
.
status
,
"ready"
);
vg1
.
vnodes
[
0
].
dnode_id
=
1
;
strcpy
(
vg1
.
vnodes
[
0
].
vnode_role
,
"master"
);
vg1
.
vnodes
[
1
].
dnode_id
=
2
;
strcpy
(
vg1
.
vnodes
[
1
].
vnode_role
,
"slave"
);
taosArrayPush
(
pInfo
->
vgroups
,
&
vg1
);
SMonVgroupDesc
vg2
=
{
0
};
vg2
.
vgroup_id
=
2
;
strcpy
(
vg2
.
database_name
,
"d2"
);
vg2
.
tables_num
=
5
;
strcpy
(
vg2
.
status
,
"offline"
);
vg2
.
vnodes
[
0
].
dnode_id
=
1
;
strcpy
(
vg2
.
vnodes
[
0
].
vnode_role
,
"master"
);
vg2
.
vnodes
[
1
].
dnode_id
=
2
;
strcpy
(
vg2
.
vnodes
[
1
].
vnode_role
,
"unsynced"
);
taosArrayPush
(
pInfo
->
vgroups
,
&
vg2
);
SMonVgroupDesc
vg3
=
{
0
};
vg3
.
vgroup_id
=
3
;
strcpy
(
vg3
.
database_name
,
"d3"
);
vg3
.
tables_num
=
6
;
strcpy
(
vg3
.
status
,
"ready"
);
vg3
.
vnodes
[
0
].
dnode_id
=
1
;
strcpy
(
vg3
.
vnodes
[
0
].
vnode_role
,
"master"
);
taosArrayPush
(
pInfo
->
vgroups
,
&
vg3
);
}
void
MonitorTest
::
GetGrantInfo
(
SMonInfo
*
pMonitor
,
SMonGrantInfo
*
pInfo
)
{
pInfo
->
expire_time
=
1234567
;
pInfo
->
timeseries_total
=
234567
;
pInfo
->
timeseries_used
=
34567
;
}
void
MonitorTest
::
GetDnodeInfo
(
SMonInfo
*
pMonitor
,
SMonDnodeInfo
*
pInfo
)
{
pInfo
->
uptime
=
1.2
;
pInfo
->
cpu_engine
=
2.1
;
pInfo
->
cpu_system
=
2.1
;
pInfo
->
cpu_cores
=
2
;
pInfo
->
mem_engine
=
3.1
;
pInfo
->
mem_system
=
3.2
;
pInfo
->
mem_total
=
3.3
;
pInfo
->
disk_engine
=
4.1
;
pInfo
->
disk_used
=
4.2
;
pInfo
->
disk_total
=
4.3
;
pInfo
->
net_in
=
5.1
;
pInfo
->
net_out
=
5.2
;
pInfo
->
io_read
=
6.1
;
pInfo
->
io_write
=
6.2
;
pInfo
->
io_read_disk
=
7.1
;
pInfo
->
io_write_disk
=
7.2
;
pInfo
->
req_select
=
8
;
pInfo
->
req_select_rate
=
8.1
;
pInfo
->
req_insert
=
9
;
pInfo
->
req_insert_success
=
10
;
pInfo
->
req_insert_rate
=
10.1
;
pInfo
->
req_insert_batch
=
11
;
pInfo
->
req_insert_batch_success
=
12
;
pInfo
->
req_insert_batch_rate
=
12.3
;
pInfo
->
errors
=
4
;
pInfo
->
vnodes_num
=
5
;
pInfo
->
masters
=
6
;
pInfo
->
has_mnode
=
1
;
}
void
MonitorTest
::
GetDiskInfo
(
SMonInfo
*
pMonitor
,
SMonDiskInfo
*
pInfo
)
{
pInfo
->
datadirs
=
taosArrayInit
(
2
,
sizeof
(
SMonDiskDesc
));
SMonDiskDesc
d1
=
{
0
};
strcpy
(
d1
.
name
,
"/t1/d1/d"
);
d1
.
level
=
0
;
d1
.
size
.
avail
=
11
;
d1
.
size
.
total
=
12
;
d1
.
size
.
used
=
13
;
taosArrayPush
(
pInfo
->
datadirs
,
&
d1
);
SMonDiskDesc
d2
=
{
0
};
strcpy
(
d2
.
name
,
"/t2d2/d"
);
d2
.
level
=
2
;
d2
.
size
.
avail
=
21
;
d2
.
size
.
total
=
22
;
d2
.
size
.
used
=
23
;
taosArrayPush
(
pInfo
->
datadirs
,
&
d2
);
SMonDiskDesc
d3
=
{
0
};
strcpy
(
d3
.
name
,
"/t3/d3/d"
);
d3
.
level
=
3
;
d3
.
size
.
avail
=
31
;
d3
.
size
.
total
=
32
;
d3
.
size
.
used
=
33
;
taosArrayPush
(
pInfo
->
datadirs
,
&
d3
);
strcpy
(
pInfo
->
logdir
.
name
,
"/log/dir/d"
);
pInfo
->
logdir
.
size
.
avail
=
41
;
pInfo
->
logdir
.
size
.
total
=
42
;
pInfo
->
logdir
.
size
.
used
=
43
;
strcpy
(
pInfo
->
tempdir
.
name
,
"/data/dir/d"
);
pInfo
->
tempdir
.
size
.
avail
=
51
;
pInfo
->
tempdir
.
size
.
total
=
52
;
pInfo
->
tempdir
.
size
.
used
=
53
;
}
void
MonitorTest
::
AddLogInfo1
()
{
SMonLogItem
log1
=
{
0
};
log1
.
ts
=
taosGetTimestampMs
();
log1
.
level
=
1
;
strcpy
(
log1
.
content
,
"1 -------------------------- a"
);
monAddLogItem
(
&
log1
);
SMonLogItem
log2
=
{
0
};
log2
.
ts
=
taosGetTimestampMs
();
log2
.
level
=
1
;
strcpy
(
log2
.
content
,
"1 ------------------------ b"
);
monAddLogItem
(
&
log2
);
SMonLogItem
log3
=
{
0
};
log3
.
ts
=
taosGetTimestampMs
();
log3
.
level
=
1
;
strcpy
(
log3
.
content
,
"1 ------- c"
);
monAddLogItem
(
&
log3
);
}
void
MonitorTest
::
AddLogInfo2
()
{
SMonLogItem
log1
;
log1
.
ts
=
taosGetTimestampMs
();
log1
.
level
=
01
;
strcpy
(
log1
.
content
,
"2 ------- a"
);
monAddLogItem
(
&
log1
);
SMonLogItem
log2
;
log2
.
ts
=
taosGetTimestampMs
();
log2
.
level
=
0
;
strcpy
(
log2
.
content
,
"2 ------- b"
);
monAddLogItem
(
&
log2
);
}
TEST_F
(
MonitorTest
,
01
_Full
)
{
AddLogInfo1
();
SMonInfo
*
pMonitor
=
monCreateMonitorInfo
();
if
(
pMonitor
==
NULL
)
return
;
SMonBasicInfo
basicInfo
=
{
0
};
GetBasicInfo
(
pMonitor
,
&
basicInfo
);
monSetBasicInfo
(
pMonitor
,
&
basicInfo
);
SMonClusterInfo
clusterInfo
=
{
0
};
SMonVgroupInfo
vgroupInfo
=
{
0
};
SMonGrantInfo
grantInfo
=
{
0
};
GetClusterInfo
(
pMonitor
,
&
clusterInfo
);
GetVgroupInfo
(
pMonitor
,
&
vgroupInfo
);
GetGrantInfo
(
pMonitor
,
&
grantInfo
);
monSetClusterInfo
(
pMonitor
,
&
clusterInfo
);
monSetVgroupInfo
(
pMonitor
,
&
vgroupInfo
);
monSetGrantInfo
(
pMonitor
,
&
grantInfo
);
SMonDnodeInfo
dnodeInfo
=
{
0
};
GetDnodeInfo
(
pMonitor
,
&
dnodeInfo
);
monSetDnodeInfo
(
pMonitor
,
&
dnodeInfo
);
SMonDiskInfo
diskInfo
=
{
0
};
GetDiskInfo
(
pMonitor
,
&
diskInfo
);
monSetDiskInfo
(
pMonitor
,
&
diskInfo
);
monSendReport
(
pMonitor
);
monCleanupMonitorInfo
(
pMonitor
);
taosArrayDestroy
(
clusterInfo
.
dnodes
);
taosArrayDestroy
(
clusterInfo
.
mnodes
);
taosArrayDestroy
(
vgroupInfo
.
vgroups
);
taosArrayDestroy
(
diskInfo
.
datadirs
);
}
TEST_F
(
MonitorTest
,
02
_Log
)
{
AddLogInfo2
();
SMonInfo
*
pMonitor
=
monCreateMonitorInfo
();
if
(
pMonitor
==
NULL
)
return
;
TEST_F
(
MonitorTest
,
01
_Open_Close
)
{
monSendReport
(
pMonitor
);
monCleanupMonitorInfo
(
pMonitor
);
}
}
source/libs/transport/src/transSrv.c
浏览文件 @
f7239014
...
@@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) {
...
@@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) {
transClearBuffer
(
&
conn
->
readBuf
);
transClearBuffer
(
&
conn
->
readBuf
);
if
(
status
==
0
)
{
if
(
status
==
0
)
{
tTrace
(
"server conn %p data already was written on stream"
,
conn
);
tTrace
(
"server conn %p data already was written on stream"
,
conn
);
assert
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>=
1
);
if
(
conn
->
srvMsgs
!=
NULL
)
{
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
assert
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>=
1
);
taosArrayRemove
(
conn
->
srvMsgs
,
0
);
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
destroySmsg
(
msg
);
taosArrayRemove
(
conn
->
srvMsgs
,
0
);
destroySmsg
(
msg
);
// send second data, just use for push
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
// send second data, just use for push
msg
=
(
SSrvMsg
*
)
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
uvStartSendRespInternal
(
msg
);
msg
=
(
SSrvMsg
*
)
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
uvStartSendRespInternal
(
msg
);
}
}
}
}
else
{
}
else
{
tError
(
"server conn %p failed to write data, %s"
,
conn
,
uv_err_name
(
status
));
tError
(
"server conn %p failed to write data, %s"
,
conn
,
uv_err_name
(
status
));
...
@@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
...
@@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
i
);
SSrvMsg
*
msg
=
taosArrayGetP
(
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
destroySmsg
(
msg
);
}
}
taosArrayDestroy
(
conn
->
srvMsgs
);
conn
->
srvMsgs
=
taosArrayDestroy
(
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
QUEUE_REMOVE
(
&
conn
->
queue
);
if
(
clear
)
{
if
(
clear
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录