Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bdaaabe7
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
bdaaabe7
编写于
2月 16, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
initial version for RPC
上级
cb1c3a17
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
299 addition
and
0 deletion
+299
-0
src/rpc/inc/tcache.h
src/rpc/inc/tcache.h
+35
-0
src/rpc/src/tcache.c
src/rpc/src/tcache.c
+264
-0
未找到文件。
src/rpc/inc/tcache.h
0 → 100644
浏览文件 @
bdaaabe7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
taosOpenConnCache
(
int
maxSessions
,
void
(
*
cleanFp
)(
void
*
),
void
*
tmrCtrl
,
int64_t
keepTimer
);
void
taosCloseConnCache
(
void
*
handle
);
void
*
taosAddConnIntoCache
(
void
*
handle
,
void
*
data
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
);
void
*
taosGetConnFromCache
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
src/rpc/src/tcache.c
0 → 100644
浏览文件 @
bdaaabe7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmempool.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
typedef
struct
_c_hash_t
{
uint32_t
ip
;
uint16_t
port
;
struct
_c_hash_t
*
prev
;
struct
_c_hash_t
*
next
;
void
*
data
;
uint64_t
time
;
}
SConnHash
;
typedef
struct
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
int
maxSessions
;
int
total
;
int
*
count
;
int64_t
keepTimer
;
pthread_mutex_t
mutex
;
void
(
*
cleanFp
)(
void
*
);
void
*
tmrCtrl
;
void
*
pTimer
;
}
SConnCache
;
int
taosHashConn
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
SConnCache
*
pObj
=
(
SConnCache
*
)
handle
;
int
hash
=
0
;
// size_t user_len = strlen(user);
hash
=
ip
>>
16
;
hash
+=
(
unsigned
short
)(
ip
&
0xFFFF
);
hash
+=
port
;
while
(
*
user
!=
'\0'
)
{
hash
+=
*
user
;
user
++
;
}
hash
=
hash
%
pObj
->
maxSessions
;
return
hash
;
}
void
taosRemoveExpiredNodes
(
SConnCache
*
pObj
,
SConnHash
*
pNode
,
int
hash
,
uint64_t
time
)
{
if
(
pNode
==
NULL
)
return
;
if
(
time
<
pObj
->
keepTimer
+
pNode
->
time
)
return
;
SConnHash
*
pPrev
=
pNode
->
prev
,
*
pNext
;
while
(
pNode
)
{
(
*
pObj
->
cleanFp
)(
pNode
->
data
);
pNext
=
pNode
->
next
;
pObj
->
total
--
;
pObj
->
count
[
hash
]
--
;
tscTrace
(
"%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d"
,
pNode
->
data
,
pNode
->
ip
,
pNode
->
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
taosMemPoolFree
(
pObj
->
connHashMemPool
,
(
char
*
)
pNode
);
pNode
=
pNext
;
}
if
(
pPrev
)
pPrev
->
next
=
NULL
;
else
pObj
->
connHashList
[
hash
]
=
NULL
;
}
void
*
taosAddConnIntoCache
(
void
*
handle
,
void
*
data
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
uint64_t
time
=
taosGetTimestampMs
();
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
if
(
data
==
NULL
)
{
tscTrace
(
"data:%p ip:%p:%d not valid, not added in cache"
,
data
,
ip
,
port
);
return
NULL
;
}
hash
=
taosHashConn
(
pObj
,
ip
,
port
,
user
);
pNode
=
(
SConnHash
*
)
taosMemPoolMalloc
(
pObj
->
connHashMemPool
);
pNode
->
ip
=
ip
;
pNode
->
port
=
port
;
pNode
->
data
=
data
;
pNode
->
prev
=
NULL
;
pNode
->
time
=
time
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
->
next
=
pObj
->
connHashList
[
hash
];
if
(
pObj
->
connHashList
[
hash
]
!=
NULL
)
(
pObj
->
connHashList
[
hash
])
->
prev
=
pNode
;
pObj
->
connHashList
[
hash
]
=
pNode
;
pObj
->
total
++
;
pObj
->
count
[
hash
]
++
;
taosRemoveExpiredNodes
(
pObj
,
pNode
->
next
,
hash
,
time
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
tscTrace
(
"%p ip:0x%x:%hu:%d:%p added, connections in cache:%d"
,
data
,
ip
,
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
return
pObj
;
}
void
taosCleanConnCache
(
void
*
handle
,
void
*
tmrId
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
if
(
pObj
->
pTimer
!=
tmrId
)
return
;
uint64_t
time
=
taosGetTimestampMs
();
for
(
hash
=
0
;
hash
<
pObj
->
maxSessions
;
++
hash
)
{
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
=
pObj
->
connHashList
[
hash
];
taosRemoveExpiredNodes
(
pObj
,
pNode
,
hash
,
time
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset
(
taosCleanConnCache
,
pObj
->
keepTimer
*
2
,
pObj
,
pObj
->
tmrCtrl
,
&
pObj
->
pTimer
);
}
void
*
taosGetConnFromCache
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
,
char
*
user
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pObj
;
void
*
pData
=
NULL
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
uint64_t
time
=
taosGetTimestampMs
();
hash
=
taosHashConn
(
pObj
,
ip
,
port
,
user
);
pthread_mutex_lock
(
&
pObj
->
mutex
);
pNode
=
pObj
->
connHashList
[
hash
];
while
(
pNode
)
{
if
(
time
>=
pObj
->
keepTimer
+
pNode
->
time
)
{
taosRemoveExpiredNodes
(
pObj
,
pNode
,
hash
,
time
);
pNode
=
NULL
;
break
;
}
if
(
pNode
->
ip
==
ip
&&
pNode
->
port
==
port
)
break
;
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
taosRemoveExpiredNodes
(
pObj
,
pNode
->
next
,
hash
,
time
);
if
(
pNode
->
prev
)
{
pNode
->
prev
->
next
=
pNode
->
next
;
}
else
{
pObj
->
connHashList
[
hash
]
=
pNode
->
next
;
}
if
(
pNode
->
next
)
{
pNode
->
next
->
prev
=
pNode
->
prev
;
}
pData
=
pNode
->
data
;
taosMemPoolFree
(
pObj
->
connHashMemPool
,
(
char
*
)
pNode
);
pObj
->
total
--
;
pObj
->
count
[
hash
]
--
;
}
pthread_mutex_unlock
(
&
pObj
->
mutex
);
if
(
pData
)
{
tscTrace
(
"%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d"
,
pData
,
ip
,
port
,
hash
,
pNode
,
pObj
->
count
[
hash
]);
}
return
pData
;
}
void
*
taosOpenConnCache
(
int
maxSessions
,
void
(
*
cleanFp
)(
void
*
),
void
*
tmrCtrl
,
int64_t
keepTimer
)
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
SConnCache
*
pObj
;
connHashMemPool
=
taosMemPoolInit
(
maxSessions
,
sizeof
(
SConnHash
));
if
(
connHashMemPool
==
0
)
return
NULL
;
connHashList
=
calloc
(
sizeof
(
SConnHash
*
),
maxSessions
);
if
(
connHashList
==
0
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
return
NULL
;
}
pObj
=
malloc
(
sizeof
(
SConnCache
));
if
(
pObj
==
NULL
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
free
(
connHashList
);
return
NULL
;
}
memset
(
pObj
,
0
,
sizeof
(
SConnCache
));
pObj
->
count
=
calloc
(
sizeof
(
int
),
maxSessions
);
pObj
->
total
=
0
;
pObj
->
keepTimer
=
keepTimer
;
pObj
->
maxSessions
=
maxSessions
;
pObj
->
connHashMemPool
=
connHashMemPool
;
pObj
->
connHashList
=
connHashList
;
pObj
->
cleanFp
=
cleanFp
;
pObj
->
tmrCtrl
=
tmrCtrl
;
taosTmrReset
(
taosCleanConnCache
,
pObj
->
keepTimer
*
2
,
pObj
,
pObj
->
tmrCtrl
,
&
pObj
->
pTimer
);
pthread_mutex_init
(
&
pObj
->
mutex
,
NULL
);
return
pObj
;
}
void
taosCloseConnCache
(
void
*
handle
)
{
SConnCache
*
pObj
;
pObj
=
(
SConnCache
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
pthread_mutex_lock
(
&
pObj
->
mutex
);
taosTmrStopA
(
&
(
pObj
->
pTimer
));
if
(
pObj
->
connHashMemPool
)
taosMemPoolCleanUp
(
pObj
->
connHashMemPool
);
tfree
(
pObj
->
connHashList
);
tfree
(
pObj
->
count
)
pthread_mutex_unlock
(
&
pObj
->
mutex
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
memset
(
pObj
,
0
,
sizeof
(
SConnCache
));
free
(
pObj
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录