Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
19ba6ef2
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
19ba6ef2
编写于
7月 04, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 04, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2561 from taosdata/feature/query
Feature/query
上级
6e818791
838ffb5f
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
256 addition
and
113 deletion
+256
-113
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+2
-2
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+21
-19
src/inc/dnode.h
src/inc/dnode.h
+1
-0
src/inc/query.h
src/inc/query.h
+7
-0
src/inc/taosdef.h
src/inc/taosdef.h
+1
-0
src/inc/vnode.h
src/inc/vnode.h
+1
-0
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+1
-1
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+1
-1
src/plugins/http/src/httpContext.c
src/plugins/http/src/httpContext.c
+1
-1
src/plugins/http/src/httpSession.c
src/plugins/http/src/httpSession.c
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+127
-8
src/util/inc/tcache.h
src/util/inc/tcache.h
+3
-2
src/util/src/tcache.c
src/util/src/tcache.c
+35
-33
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+14
-14
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+38
-29
未找到文件。
src/client/src/tscLocal.c
浏览文件 @
19ba6ef2
...
@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
...
@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql
->
res
.
qhandle
=
0x1
;
pSql
->
res
.
qhandle
=
0x1
;
pSql
->
res
.
numOfRows
=
0
;
pSql
->
res
.
numOfRows
=
0
;
}
else
if
(
pCmd
->
command
==
TSDB_SQL_RESET_CACHE
)
{
}
else
if
(
pCmd
->
command
==
TSDB_SQL_RESET_CACHE
)
{
taosCacheEmpty
(
tscCacheHandle
);
taosCacheEmpty
(
tscCacheHandle
,
false
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_SERV_VERSION
)
{
}
else
if
(
pCmd
->
command
==
TSDB_SQL_SERV_VERSION
)
{
tscProcessServerVer
(
pSql
);
tscProcessServerVer
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_CLI_VERSION
)
{
}
else
if
(
pCmd
->
command
==
TSDB_SQL_CLI_VERSION
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
19ba6ef2
...
@@ -1950,7 +1950,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
...
@@ -1950,7 +1950,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
}
int
tscProcessDropDbRsp
(
SSqlObj
*
UNUSED_PARAM
(
pSql
))
{
int
tscProcessDropDbRsp
(
SSqlObj
*
UNUSED_PARAM
(
pSql
))
{
taosCacheEmpty
(
tscCacheHandle
);
taosCacheEmpty
(
tscCacheHandle
,
false
);
return
0
;
return
0
;
}
}
...
@@ -1996,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
...
@@ -1996,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
if
(
isSuperTable
)
{
// if it is a super table, reset whole query cache
if
(
isSuperTable
)
{
// if it is a super table, reset whole query cache
tscDebug
(
"%p reset query cache since table:%s is stable"
,
pSql
,
pTableMetaInfo
->
name
);
tscDebug
(
"%p reset query cache since table:%s is stable"
,
pSql
,
pTableMetaInfo
->
name
);
taosCacheEmpty
(
tscCacheHandle
);
taosCacheEmpty
(
tscCacheHandle
,
false
);
}
}
}
}
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
19ba6ef2
...
@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
...
@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_FETCH
)
{
pVnode
=
vnodeGetVnode
(
pHead
->
vgId
);
}
else
{
pVnode
=
vnodeAccquireVnode
(
pHead
->
vgId
);
pVnode
=
vnodeAccquireVnode
(
pHead
->
vgId
);
}
if
(
pVnode
==
NULL
)
{
if
(
pVnode
==
NULL
)
{
leftLen
-=
pHead
->
contLen
;
leftLen
-=
pHead
->
contLen
;
...
@@ -179,24 +175,17 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
...
@@ -179,24 +175,17 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
// dynamically adjust the number of threads
}
}
static
void
dnodeContinueExecuteQuery
(
void
*
pVnode
,
void
*
qhandle
,
SReadMsg
*
pMsg
)
{
void
dnodePutQhandleIntoReadQueue
(
void
*
pVnode
,
void
*
qhandle
)
{
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
pRead
->
rpcMsg
=
pMsg
->
rpcMsg
;
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
pRead
->
pCont
=
qhandle
;
pRead
->
pCont
=
qhandle
;
pRead
->
contLen
=
0
;
pRead
->
contLen
=
0
;
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
taos_queue
queue
=
vnode
Get
Rqueue
(
pVnode
);
taos_queue
queue
=
vnode
Accquire
Rqueue
(
pVnode
);
taosWriteQitem
(
queue
,
TAOS_QTYPE_
RPC
,
pRead
);
taosWriteQitem
(
queue
,
TAOS_QTYPE_
QUERY
,
pRead
);
}
}
void
dnodeSendRpcReadRsp
(
void
*
pVnode
,
SReadMsg
*
pRead
,
int32_t
code
)
{
void
dnodeSendRpcReadRsp
(
void
*
pVnode
,
SReadMsg
*
pRead
,
int32_t
code
)
{
if
(
code
==
TSDB_CODE_VND_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
TSDB_CODE_VND_ACTION_NEED_REPROCESSED
)
{
dnodeContinueExecuteQuery
(
pVnode
,
pRead
->
rspRet
.
qhandle
,
pRead
);
code
=
TSDB_CODE_SUCCESS
;
}
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pRead
->
rpcMsg
.
handle
,
.
handle
=
pRead
->
rpcMsg
.
handle
,
.
pCont
=
pRead
->
rspRet
.
rsp
,
.
pCont
=
pRead
->
rspRet
.
rsp
,
...
@@ -206,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
...
@@ -206,6 +195,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse
(
&
rpcRsp
);
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pRead
->
rpcMsg
.
pCont
);
rpcFreeCont
(
pRead
->
rpcMsg
.
pCont
);
vnodeRelease
(
pVnode
);
}
void
dnodeDispatchNonRspMsg
(
void
*
pVnode
,
SReadMsg
*
pRead
,
int32_t
code
)
{
vnodeRelease
(
pVnode
);
return
;
}
}
static
void
*
dnodeProcessReadQueue
(
void
*
param
)
{
static
void
*
dnodeProcessReadQueue
(
void
*
param
)
{
...
@@ -219,9 +214,16 @@ static void *dnodeProcessReadQueue(void *param) {
...
@@ -219,9 +214,16 @@ static void *dnodeProcessReadQueue(void *param) {
break
;
break
;
}
}
dDebug
(
"%p, msg:%s will be processed in vread queue"
,
pReadMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pReadMsg
->
rpcMsg
.
msgType
]);
dDebug
(
"%p, msg:%s will be processed in vread queue, qtype:%d"
,
pReadMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pReadMsg
->
rpcMsg
.
msgType
],
type
);
int32_t
code
=
vnodeProcessRead
(
pVnode
,
pReadMsg
);
int32_t
code
=
vnodeProcessRead
(
pVnode
,
pReadMsg
);
if
(
type
==
TAOS_QTYPE_RPC
)
{
dnodeSendRpcReadRsp
(
pVnode
,
pReadMsg
,
code
);
dnodeSendRpcReadRsp
(
pVnode
,
pReadMsg
,
code
);
}
else
{
dnodeDispatchNonRspMsg
(
pVnode
,
pReadMsg
,
code
);
}
taosFreeQitem
(
pReadMsg
);
taosFreeQitem
(
pReadMsg
);
}
}
...
...
src/inc/dnode.h
浏览文件 @
19ba6ef2
...
@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
...
@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void
dnodeFreeVnodeWqueue
(
void
*
queue
);
void
dnodeFreeVnodeWqueue
(
void
*
queue
);
void
*
dnodeAllocateVnodeRqueue
(
void
*
pVnode
);
void
*
dnodeAllocateVnodeRqueue
(
void
*
pVnode
);
void
dnodeFreeVnodeRqueue
(
void
*
rqueue
);
void
dnodeFreeVnodeRqueue
(
void
*
rqueue
);
void
dnodePutQhandleIntoReadQueue
(
void
*
pVnode
,
void
*
qhandle
);
void
dnodeSendRpcVnodeWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
void
dnodeSendRpcVnodeWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
int32_t
dnodeAllocateMnodePqueue
();
int32_t
dnodeAllocateMnodePqueue
();
...
...
src/inc/query.h
浏览文件 @
19ba6ef2
...
@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
...
@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/
*/
int32_t
qKillQuery
(
qinfo_t
qinfo
);
int32_t
qKillQuery
(
qinfo_t
qinfo
);
void
*
qOpenQueryMgmt
(
int32_t
vgId
);
void
qSetQueryMgmtClosed
(
void
*
pExecutor
);
void
qCleanupQueryMgmt
(
void
*
pExecutor
);
void
**
qRegisterQInfo
(
void
*
pMgmt
,
void
*
qInfo
);
void
**
qAcquireQInfo
(
void
*
pMgmt
,
void
**
key
);
void
**
qReleaseQInfo
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
needFree
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
src/inc/taosdef.h
浏览文件 @
19ba6ef2
...
@@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
...
@@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_QUERY 4
typedef
enum
{
typedef
enum
{
TSDB_SUPER_TABLE
=
0
,
// super table
TSDB_SUPER_TABLE
=
0
,
// super table
...
...
src/inc/vnode.h
浏览文件 @
19ba6ef2
...
@@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode);
...
@@ -52,6 +52,7 @@ void vnodeRelease(void *pVnode);
void
*
vnodeAccquireVnode
(
int32_t
vgId
);
// add refcount
void
*
vnodeAccquireVnode
(
int32_t
vgId
);
// add refcount
void
*
vnodeGetVnode
(
int32_t
vgId
);
// keep refcount unchanged
void
*
vnodeGetVnode
(
int32_t
vgId
);
// keep refcount unchanged
void
*
vnodeAccquireRqueue
(
void
*
);
void
*
vnodeGetRqueue
(
void
*
);
void
*
vnodeGetRqueue
(
void
*
);
void
*
vnodeGetWqueue
(
int32_t
vgId
);
void
*
vnodeGetWqueue
(
int32_t
vgId
);
void
*
vnodeGetWal
(
void
*
pVnode
);
void
*
vnodeGetWal
(
void
*
pVnode
);
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
19ba6ef2
...
@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
...
@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_STREAM
,
mnodeProcessKillStreamMsg
);
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_STREAM
,
mnodeProcessKillStreamMsg
);
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_CONN
,
mnodeProcessKillConnectionMsg
);
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_CONN
,
mnodeProcessKillConnectionMsg
);
tsMnodeConnCache
=
taosCacheInit
WithCb
(
TSDB_DATA_TYPE_INT
,
CONN_CHECK_TIME
,
false
,
mnodeFreeConn
,
"conn"
);
tsMnodeConnCache
=
taosCacheInit
(
TSDB_DATA_TYPE_INT
,
CONN_CHECK_TIME
,
false
,
mnodeFreeConn
,
"conn"
);
return
0
;
return
0
;
}
}
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
19ba6ef2
...
@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
...
@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mnodeProcessConnectMsg
);
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mnodeProcessConnectMsg
);
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mnodeProcessUseMsg
);
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mnodeProcessUseMsg
);
tsMnodeShowCache
=
taosCacheInit
WithCb
(
TSDB_DATA_TYPE_INT
,
5
,
false
,
mnodeFreeShowObj
,
"show"
);
tsMnodeShowCache
=
taosCacheInit
(
TSDB_DATA_TYPE_INT
,
5
,
false
,
mnodeFreeShowObj
,
"show"
);
return
0
;
return
0
;
}
}
...
...
src/plugins/http/src/httpContext.c
浏览文件 @
19ba6ef2
...
@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
...
@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
}
}
bool
httpInitContexts
()
{
bool
httpInitContexts
()
{
tsHttpServer
.
contextCache
=
taosCacheInit
WithCb
(
TSDB_DATA_TYPE_BIGINT
,
2
,
false
,
httpDestroyContext
,
"restc"
);
tsHttpServer
.
contextCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BIGINT
,
2
,
false
,
httpDestroyContext
,
"restc"
);
if
(
tsHttpServer
.
contextCache
==
NULL
)
{
if
(
tsHttpServer
.
contextCache
==
NULL
)
{
httpError
(
"failed to init context cache"
);
httpError
(
"failed to init context cache"
);
return
false
;
return
false
;
...
...
src/plugins/http/src/httpSession.c
浏览文件 @
19ba6ef2
...
@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
...
@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
}
}
bool
httpInitSessions
()
{
bool
httpInitSessions
()
{
tsHttpServer
.
sessionCache
=
taosCacheInit
WithCb
(
TSDB_DATA_TYPE_BINARY
,
5
,
false
,
httpDestroySession
,
"rests"
);
tsHttpServer
.
sessionCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
5
,
false
,
httpDestroySession
,
"rests"
);
if
(
tsHttpServer
.
sessionCache
==
NULL
)
{
if
(
tsHttpServer
.
sessionCache
==
NULL
)
{
httpError
(
"failed to init session cache"
);
httpError
(
"failed to init session cache"
);
return
false
;
return
false
;
...
...
src/query/src/qExecutor.c
浏览文件 @
19ba6ef2
...
@@ -13,8 +13,10 @@
...
@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "os.h"
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tglobal.h"
#include "qfill.h"
#include "qfill.h"
#include "taosmsg.h"
#include "hash.h"
#include "hash.h"
#include "qExecutor.h"
#include "qExecutor.h"
...
@@ -87,16 +89,18 @@ typedef struct {
...
@@ -87,16 +89,18 @@ typedef struct {
STSCursor
cur
;
STSCursor
cur
;
}
SQueryStatusInfo
;
}
SQueryStatusInfo
;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
static UNUSED_FUNC void *u_malloc (size_t __size) {
//
uint32_t v = rand();
uint32_t v = rand();
//
if (v % 5 <= 1) {
if (v % 5 <= 1) {
//
return NULL;
return NULL;
//
} else {
} else {
return malloc(__size);
return malloc(__size);
//
}
}
}
}
#define malloc u_malloc
#define malloc u_malloc
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
...
@@ -1520,7 +1524,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -1520,7 +1524,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static
bool
isQueryKilled
(
SQInfo
*
pQInfo
)
{
static
bool
isQueryKilled
(
SQInfo
*
pQInfo
)
{
return
false
;
return
(
pQInfo
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
);
return
(
pQInfo
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
}
...
@@ -5910,9 +5913,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
...
@@ -5910,9 +5913,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
typedef
struct
SQueryMgmt
{
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
pthread_mutex_t
lock
;
}
SQueryMgmt
;
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryMsg
,
void
*
param
,
_qinfo_free_fn_t
fn
,
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryMsg
,
void
*
param
,
_qinfo_free_fn_t
fn
,
qinfo_t
*
pQInfo
)
{
qinfo_t
*
pQInfo
)
{
assert
(
pQueryMsg
!=
NULL
);
assert
(
pQueryMsg
!=
NULL
&&
tsdb
!=
NULL
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
@@ -6356,3 +6366,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
...
@@ -6356,3 +6366,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
}
}
void
freeqinfoFn
(
void
*
qhandle
)
{
void
**
handle
=
qhandle
;
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
;
}
qKillQuery
(
*
handle
);
}
void
*
qOpenQueryMgmt
(
int32_t
vgId
)
{
const
int32_t
REFRESH_HANDLE_INTERVAL
=
2
;
// every 2 seconds, refresh handle pool
char
cacheName
[
128
]
=
{
0
};
sprintf
(
cacheName
,
"qhandle_%d"
,
vgId
);
SQueryMgmt
*
pQueryHandle
=
calloc
(
1
,
sizeof
(
SQueryMgmt
));
pQueryHandle
->
qinfoPool
=
taosCacheInit
(
TSDB_DATA_TYPE_BIGINT
,
REFRESH_HANDLE_INTERVAL
,
true
,
freeqinfoFn
,
cacheName
);
pQueryHandle
->
closed
=
false
;
pthread_mutex_init
(
&
pQueryHandle
->
lock
,
NULL
);
qDebug
(
"vgId:%d, open querymgmt success"
,
vgId
);
return
pQueryHandle
;
}
void
qSetQueryMgmtClosed
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
qDebug
(
"vgId:%d, set querymgmt closed, wait for all queries cancelled"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
true
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
taosCacheEmpty
(
pQueryMgmt
->
qinfoPool
,
true
);
}
void
qCleanupQueryMgmt
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
int32_t
vgId
=
pQueryMgmt
->
vgId
;
assert
(
pQueryMgmt
->
closed
);
SCacheObj
*
pqinfoPool
=
pQueryMgmt
->
qinfoPool
;
pQueryMgmt
->
qinfoPool
=
NULL
;
taosCacheCleanup
(
pqinfoPool
);
pthread_mutex_destroy
(
&
pQueryMgmt
->
lock
);
tfree
(
pQueryMgmt
);
qDebug
(
"vgId:%d querymgmt cleanup completed"
,
vgId
);
}
void
**
qRegisterQInfo
(
void
*
pMgmt
,
void
*
qInfo
)
{
if
(
pMgmt
==
NULL
)
{
return
NULL
;
}
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
return
NULL
;
}
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
return
NULL
;
}
else
{
void
**
handle
=
taosCachePut
(
pQueryMgmt
->
qinfoPool
,
qInfo
,
POINTER_BYTES
,
&
qInfo
,
POINTER_BYTES
,
tsShellActivityTimer
*
2
);
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
return
handle
;
}
}
void
**
qAcquireQInfo
(
void
*
pMgmt
,
void
**
key
)
{
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
||
pQueryMgmt
->
closed
)
{
return
NULL
;
}
void
**
handle
=
taosCacheAcquireByKey
(
pQueryMgmt
->
qinfoPool
,
key
,
POINTER_BYTES
);
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
NULL
;
}
else
{
return
handle
;
}
}
void
**
qReleaseQInfo
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
needFree
)
{
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
return
NULL
;
}
taosCacheRelease
(
pQueryMgmt
->
qinfoPool
,
pQInfo
,
needFree
);
return
0
;
}
src/util/inc/tcache.h
浏览文件 @
19ba6ef2
...
@@ -65,7 +65,7 @@ typedef struct {
...
@@ -65,7 +65,7 @@ typedef struct {
int64_t
totalSize
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
totalSize
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
int64_t
refreshTime
;
STrashElem
*
pTrash
;
STrashElem
*
pTrash
;
c
onst
char
*
cacheN
ame
;
c
har
*
n
ame
;
// void * tmrCtrl;
// void * tmrCtrl;
// void * pTimer;
// void * pTimer;
SCacheStatis
statistics
;
SCacheStatis
statistics
;
...
@@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
...
@@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/**
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
* @param handle
* @param _remove remove the data or not if refcount is greater than 0
*/
*/
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
);
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
,
bool
_remove
);
/**
/**
* release all allocated memory and destroy the cache object.
* release all allocated memory and destroy the cache object.
...
...
src/util/src/tcache.c
浏览文件 @
19ba6ef2
...
@@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
...
@@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t
size
=
pNode
->
size
;
int32_t
size
=
pNode
->
size
;
taosHashRemove
(
pCacheObj
->
pHashTable
,
pNode
->
key
,
pNode
->
keySize
);
taosHashRemove
(
pCacheObj
->
pHashTable
,
pNode
->
key
,
pNode
->
keySize
);
uDebug
(
"key:%p, %p is destroyed from cache, totalNum:%d totalSize:%"
PRId64
"bytes size:%dbytes, cacheName:%s"
,
uDebug
(
"cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%"
PRId64
"bytes size:%dbytes"
,
pNode
->
key
,
pNode
->
data
,
(
int32_t
)
taosHashGetSize
(
pCacheObj
->
pHashTable
),
pCacheObj
->
totalSize
,
size
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
(
int32_t
)
taosHashGetSize
(
pCacheObj
->
pHashTable
),
pCacheObj
->
totalSize
,
size
);
pCacheObj
->
cacheName
);
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pNode
->
data
);
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pNode
->
data
);
free
(
pNode
);
free
(
pNode
);
}
}
...
@@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
...
@@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/
*/
static
void
*
taosCacheRefresh
(
void
*
handle
);
static
void
*
taosCacheRefresh
(
void
*
handle
);
SCacheObj
*
taosCacheInit
WithCb
(
int32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
,
const
char
*
cacheName
)
{
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
,
const
char
*
cacheName
)
{
if
(
refreshTimeInSeconds
<=
0
)
{
if
(
refreshTimeInSeconds
<=
0
)
{
return
NULL
;
return
NULL
;
}
}
...
@@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
...
@@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
}
}
pCacheObj
->
pHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
keyType
),
false
);
pCacheObj
->
pHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
keyType
),
false
);
pCacheObj
->
cacheName
=
cacheName
;
pCacheObj
->
name
=
strdup
(
cacheName
)
;
if
(
pCacheObj
->
pHashTable
==
NULL
)
{
if
(
pCacheObj
->
pHashTable
==
NULL
)
{
free
(
pCacheObj
);
free
(
pCacheObj
);
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
...
@@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
...
@@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
return
pCacheObj
;
return
pCacheObj
;
}
}
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
,
const
char
*
cacheName
)
{
return
taosCacheInitWithCb
(
keyType
,
refreshTimeInSeconds
,
extendLifespan
,
fn
,
cacheName
);
}
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dataSize
,
int
duration
)
{
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dataSize
,
int
duration
)
{
SCacheDataNode
*
pNode
;
SCacheDataNode
*
pNode
;
...
@@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
...
@@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if
(
NULL
!=
pNode
)
{
if
(
NULL
!=
pNode
)
{
pCacheObj
->
totalSize
+=
pNode
->
size
;
pCacheObj
->
totalSize
+=
pNode
->
size
;
uDebug
(
"key:%p, %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", totalNum:%d totalSize:%"
PRId64
uDebug
(
"
cache:%s,
key:%p, %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", totalNum:%d totalSize:%"
PRId64
"bytes size:%"
PRId64
"bytes
, cacheName:%s
"
,
"bytes size:%"
PRId64
"bytes"
,
key
,
pNode
->
data
,
pNode
->
addedTime
,
(
pNode
->
lifespan
*
pNode
->
extendFactor
+
pNode
->
addedTime
),
pCacheObj
->
name
,
key
,
pNode
->
data
,
pNode
->
addedTime
,
(
pNode
->
lifespan
*
pNode
->
extendFactor
+
pNode
->
addedTime
),
(
int32_t
)
taosHashGetSize
(
pCacheObj
->
pHashTable
),
pCacheObj
->
totalSize
,
dataSize
,
pCacheObj
->
cacheName
);
(
int32_t
)
taosHashGetSize
(
pCacheObj
->
pHashTable
),
pCacheObj
->
totalSize
,
dataSize
);
}
else
{
}
else
{
uError
(
"
key:%p, failed to added into cache, out of memory, cacheName:%s"
,
key
,
pCacheObj
->
cacheName
);
uError
(
"
cache:%s, key:%p, failed to added into cache, out of memory"
,
pCacheObj
->
name
,
key
);
}
}
}
else
{
// old data exists, update the node
}
else
{
// old data exists, update the node
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
uDebug
(
"
key:%p, %p exist in cache, updated, cacheName:%s"
,
key
,
pNode
->
data
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, key:%p, %p exist in cache, updated"
,
pCacheObj
->
name
,
key
,
pNode
->
data
);
}
}
__cache_unlock
(
pCacheObj
);
__cache_unlock
(
pCacheObj
);
...
@@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
...
@@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
if
(
ptNode
!=
NULL
)
{
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"
key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s"
,
key
,
(
*
ptNode
)
->
data
,
ref
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, key:%p, %p is retrieved from cache, refcnt:%d"
,
pCacheObj
->
name
,
key
,
(
*
ptNode
)
->
data
,
ref
);
}
else
{
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"
key:%p, not in cache, retrieved failed, cacheName:%s"
,
key
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, key:%p, not in cache, retrieved failed"
,
pCacheObj
->
name
,
key
);
}
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
...
@@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
...
@@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
if
(
ptNode
!=
NULL
)
{
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"
key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s"
,
key
,
(
*
ptNode
)
->
data
,
uDebug
(
"
cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d"
,
pCacheObj
->
name
,
key
,
T_REF_VAL_GET
(
*
ptNode
),
pCacheObj
->
cacheName
);
(
*
ptNode
)
->
data
,
T_REF_VAL_GET
(
*
ptNode
)
);
}
else
{
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"
key:%p, not in cache, retrieved failed, cacheName:%s"
,
key
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, key:%p, not in cache, retrieved failed"
,
pCacheObj
->
name
,
key
);
}
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
...
@@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
...
@@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
}
int32_t
ref
=
T_REF_INC
(
ptNode
);
int32_t
ref
=
T_REF_INC
(
ptNode
);
uDebug
(
"
%p acquired by data in cache, refcnt:%d, cacheName:%s"
,
ptNode
->
data
,
ref
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, data: %p acquired by data in cache, refcnt:%d"
,
pCacheObj
->
name
,
ptNode
->
data
,
ref
);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if
(
pCacheObj
->
extendLifespan
)
{
if
(
pCacheObj
->
extendLifespan
)
{
...
@@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
...
@@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if
((
now
-
ptNode
->
addedTime
)
<
ptNode
->
lifespan
*
ptNode
->
extendFactor
)
{
if
((
now
-
ptNode
->
addedTime
)
<
ptNode
->
lifespan
*
ptNode
->
extendFactor
)
{
ptNode
->
extendFactor
+=
1
;
ptNode
->
extendFactor
+=
1
;
uDebug
(
"
%p extend life time to %"
PRId64
,
ptNode
->
data
,
uDebug
(
"
cache:%s, %p extend life time to %"
PRId64
,
pCacheObj
->
name
,
ptNode
->
data
,
ptNode
->
lifespan
*
ptNode
->
extendFactor
+
ptNode
->
addedTime
);
ptNode
->
lifespan
*
ptNode
->
extendFactor
+
ptNode
->
addedTime
);
}
}
}
}
...
@@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
...
@@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*
data
=
NULL
;
*
data
=
NULL
;
int16_t
ref
=
T_REF_DEC
(
pNode
);
int16_t
ref
=
T_REF_DEC
(
pNode
);
uDebug
(
"
key:%p, %p is released, refcnt:%d, cacheName:%s"
,
pNode
->
key
,
pNode
->
data
,
ref
,
pCacheObj
->
cacheName
);
uDebug
(
"
cache:%s, key:%p, %p is released, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
ref
);
if
(
_remove
&&
(
!
pNode
->
inTrashCan
))
{
if
(
_remove
&&
(
!
pNode
->
inTrashCan
))
{
__cache_wr_lock
(
pCacheObj
);
__cache_wr_lock
(
pCacheObj
);
...
@@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
...
@@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
}
}
}
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
)
{
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
,
bool
_remove
)
{
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
__cache_wr_lock
(
pCacheObj
);
__cache_wr_lock
(
pCacheObj
);
...
@@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
...
@@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
}
}
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
if
(
T_REF_VAL_GET
(
pNode
)
==
0
||
_remove
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
}
}
}
__cache_unlock
(
pCacheObj
);
__cache_unlock
(
pCacheObj
);
taosHashDestroyIter
(
pIter
);
taosHashDestroyIter
(
pIter
);
taosTrashCanEmpty
(
pCacheObj
,
fals
e
);
taosTrashCanEmpty
(
pCacheObj
,
_remov
e
);
}
}
void
taosCacheCleanup
(
SCacheObj
*
pCacheObj
)
{
void
taosCacheCleanup
(
SCacheObj
*
pCacheObj
)
{
...
@@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
...
@@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj
->
deleting
=
1
;
pCacheObj
->
deleting
=
1
;
pthread_join
(
pCacheObj
->
refreshWorker
,
NULL
);
pthread_join
(
pCacheObj
->
refreshWorker
,
NULL
);
uInfo
(
"cache
Name:%p, will be cleanuped"
,
pCacheObj
->
cacheN
ame
);
uInfo
(
"cache
:%s will be cleaned up"
,
pCacheObj
->
n
ame
);
doCleanupDataCache
(
pCacheObj
);
doCleanupDataCache
(
pCacheObj
);
}
}
...
@@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
...
@@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
while
(
taosHashIterNext
(
pIter
))
{
while
(
taosHashIterNext
(
pIter
))
{
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
if
(
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
int32_t
c
=
T_REF_VAL_GET
(
pNode
);
if
(
c
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
}
else
{
uDebug
(
"
key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s"
,
pNode
->
key
,
pNode
->
data
,
uDebug
(
"
cache:%s key:%p, %p will not remove from cache, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
T_REF_VAL_GET
(
pNode
),
pCacheObj
->
cacheName
);
pNode
->
data
,
T_REF_VAL_GET
(
pNode
)
);
}
}
}
}
taosHashDestroyIter
(
pIter
);
taosHashDestroyIter
(
pIter
);
// todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup
(
pCacheObj
->
pHashTable
);
taosHashCleanup
(
pCacheObj
->
pHashTable
);
__cache_unlock
(
pCacheObj
);
__cache_unlock
(
pCacheObj
);
taosTrashCanEmpty
(
pCacheObj
,
true
);
taosTrashCanEmpty
(
pCacheObj
,
true
);
__cache_lock_destroy
(
pCacheObj
);
__cache_lock_destroy
(
pCacheObj
);
tfree
(
pCacheObj
->
name
);
memset
(
pCacheObj
,
0
,
sizeof
(
SCacheObj
));
memset
(
pCacheObj
,
0
,
sizeof
(
SCacheObj
));
free
(
pCacheObj
);
free
(
pCacheObj
);
}
}
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
19ba6ef2
...
@@ -53,7 +53,7 @@ typedef struct {
...
@@ -53,7 +53,7 @@ typedef struct {
STsdbCfg
tsdbCfg
;
STsdbCfg
tsdbCfg
;
SSyncCfg
syncCfg
;
SSyncCfg
syncCfg
;
SWalCfg
walCfg
;
SWalCfg
walCfg
;
void
*
q
HandlePool
;
// query handle pool
void
*
q
Mgmt
;
char
*
rootDir
;
char
*
rootDir
;
char
db
[
TSDB_DB_NAME_LEN
];
char
db
[
TSDB_DB_NAME_LEN
];
}
SVnodeObj
;
}
SVnodeObj
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
19ba6ef2
...
@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
...
@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
void
vnodeFreeqHandle
(
void
*
phandle
);
static
pthread_once_t
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
static
pthread_once_t
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
...
@@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
...
@@ -283,9 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if
(
pVnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
if
(
pVnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
cqStart
(
pVnode
->
cq
);
cqStart
(
pVnode
->
cq
);
const
int32_t
REFRESH_HANDLE_INTERVAL
=
2
;
// every 2 seconds, rfresh handle pool
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
pVnode
->
qHandlePool
=
taosCacheInit
(
TSDB_DATA_TYPE_BIGINT
,
REFRESH_HANDLE_INTERVAL
,
true
,
vnodeFreeqHandle
,
"qhandle"
);
pVnode
->
events
=
NULL
;
pVnode
->
events
=
NULL
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
...
@@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) {
...
@@ -328,6 +325,9 @@ void vnodeRelease(void *pVnodeRaw) {
return
;
return
;
}
}
qCleanupQueryMgmt
(
pVnode
->
qMgmt
);
pVnode
->
qMgmt
=
NULL
;
if
(
pVnode
->
tsdb
)
if
(
pVnode
->
tsdb
)
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
pVnode
->
tsdb
=
NULL
;
pVnode
->
tsdb
=
NULL
;
...
@@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) {
...
@@ -393,6 +393,15 @@ void *vnodeAccquireVnode(int32_t vgId) {
return
pVnode
;
return
pVnode
;
}
}
void
*
vnodeAccquireRqueue
(
void
*
param
)
{
SVnodeObj
*
pVnode
=
param
;
if
(
pVnode
==
NULL
)
return
NULL
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vDebug
(
"vgId:%d, get vnode rqueue, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
((
SVnodeObj
*
)
pVnode
)
->
rqueue
;
}
void
*
vnodeGetRqueue
(
void
*
pVnode
)
{
void
*
vnodeGetRqueue
(
void
*
pVnode
)
{
return
((
SVnodeObj
*
)
pVnode
)
->
rqueue
;
return
((
SVnodeObj
*
)
pVnode
)
->
rqueue
;
}
}
...
@@ -466,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
...
@@ -466,7 +475,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
vTrace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
// release local resources only after cutting off outside connections
// release local resources only after cutting off outside connections
taosCacheCleanup
(
pVnode
->
qHandlePool
);
qSetQueryMgmtClosed
(
pVnode
->
qMgmt
);
vnodeRelease
(
pVnode
);
vnodeRelease
(
pVnode
);
}
}
...
@@ -872,12 +881,3 @@ PARSE_OVER:
...
@@ -872,12 +881,3 @@ PARSE_OVER:
if
(
fp
)
fclose
(
fp
);
if
(
fp
)
fclose
(
fp
);
return
terrno
;
return
terrno
;
}
}
void
vnodeFreeqHandle
(
void
*
qHandle
)
{
void
**
handle
=
qHandle
;
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
;
}
qKillQuery
(
*
handle
);
}
\ No newline at end of file
src/vnode/src/vnodeRead.c
浏览文件 @
19ba6ef2
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
*/
*/
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include <dnode.h>
#include "os.h"
#include "os.h"
#include "tglobal.h"
#include "tglobal.h"
...
@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg
->
free
=
htons
(
killQueryMsg
->
free
);
killQueryMsg
->
free
=
htons
(
killQueryMsg
->
free
);
killQueryMsg
->
qhandle
=
htobe64
(
killQueryMsg
->
qhandle
);
killQueryMsg
->
qhandle
=
htobe64
(
killQueryMsg
->
qhandle
);
vWarn
(
"QInfo:%p connection %p broken, kill query"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
void
*
handle
=
NULL
;
if
((
void
**
)
killQueryMsg
->
qhandle
!=
NULL
)
{
handle
=
*
(
void
**
)
killQueryMsg
->
qhandle
;
}
vWarn
(
"QInfo:%p connection %p broken, kill query"
,
handle
,
pReadMsg
->
rpcMsg
.
handle
);
assert
(
pReadMsg
->
rpcMsg
.
contLen
>
0
&&
killQueryMsg
->
free
==
1
);
assert
(
pReadMsg
->
rpcMsg
.
contLen
>
0
&&
killQueryMsg
->
free
==
1
);
// this message arrived here by means of the *query* message, so release the vnode is necessary
void
**
qhandle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
killQueryMsg
->
qhandle
);
void
**
qhandle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
(
void
*
)
&
killQueryMsg
->
qhandle
,
sizeof
(
killQueryMsg
->
qhandle
));
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
vWarn
(
"QInfo:%p invalid qhandle, no matched query handle, conn:%p"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
vWarn
(
"QInfo:%p invalid qhandle, no matched query handle, conn:%p"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
}
else
{
}
else
{
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
qhandle
,
true
);
assert
(
qhandle
==
(
void
**
)
killQueryMsg
->
qhandle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
qhandle
,
true
);
}
}
vnodeRelease
(
pVnode
);
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
}
}
...
@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void
**
handle
=
NULL
;
void
**
handle
=
NULL
;
if
(
contLen
!=
0
)
{
if
(
contLen
!=
0
)
{
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
vnodeRelease
,
&
pQInfo
);
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
NULL
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
...
@@ -105,25 +110,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -105,25 +110,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
// current connect is broken
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
pQInfo
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
// add lock here
vError
(
"vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p"
,
pVnode
->
vgId
,
pQInfo
,
handle
=
qRegisterQInfo
(
pVnode
->
qMgmt
,
pQInfo
);
pReadMsg
->
rpcMsg
.
handle
);
if
(
handle
==
NULL
)
{
// failed to register qhandle
pRsp
->
code
=
TSDB_CODE_
RPC_NETWORK_UNAVAIL
;
pRsp
->
code
=
TSDB_CODE_
QRY_INVALID_QHANDLE
;
// NOTE: there two refcount, needs to kill twice, todo refactor
// query has not been put into qhandle pool, kill it directly.
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
}
else
{
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
}
if
(
handle
!=
NULL
&&
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
pQInfo
,
pReadMsg
->
rpcMsg
.
handle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
// NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly.
qKillQuery
(
pQInfo
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
return
pRsp
->
code
;
return
pRsp
->
code
;
}
}
handle
=
taosCachePut
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
),
&
pQInfo
,
sizeof
(
pQInfo
),
tsShellActivityTimer
*
2
);
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
}
else
{
}
else
{
assert
(
pQInfo
==
NULL
);
assert
(
pQInfo
==
NULL
);
vnodeRelease
(
pVnode
);
}
}
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
vgId
,
pQInfo
);
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
vgId
,
pQInfo
);
...
@@ -138,9 +148,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -138,9 +148,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
pQInfo
!=
NULL
)
{
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
);
// do execute query
qTableQuery
(
pQInfo
);
// do execute query
assert
(
handle
!=
NULL
);
assert
(
handle
!=
NULL
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
false
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
}
}
return
code
;
return
code
;
...
@@ -159,7 +168,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -159,7 +168,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
ret
=
0
;
int32_t
ret
=
0
;
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
)
);
void
**
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
pQInfo
);
if
(
handle
==
NULL
||
handle
!=
pQInfo
)
{
if
(
handle
==
NULL
||
handle
!=
pQInfo
)
{
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
}
...
@@ -167,8 +176,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -167,8 +176,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
pRetrieve
->
free
==
1
)
{
if
(
pRetrieve
->
free
==
1
)
{
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
...
@@ -178,27 +187,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
...
@@ -178,27 +187,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp
->
completed
=
true
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
pRsp
->
useconds
=
0
;
}
else
{
// todo handle error
}
else
{
// todo handle error
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
}
}
return
ret
;
return
ret
;
}
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is received"
,
pVnode
->
vgId
,
*
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
*
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
*
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
ret
!=
TSDB_CODE_SUCCESS
)
{
//TODO
//TODO
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
}
else
{
}
else
{
// todo check code and handle error in build result set
// todo check code and handle error in build result set
code
=
qDumpRetrieveResult
(
*
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
code
=
qDumpRetrieveResult
(
*
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
if
(
qHasMoreResultsToRetrieve
(
*
pQInfo
))
{
if
(
qHasMoreResultsToRetrieve
(
*
handle
))
{
dnodePutQhandleIntoReadQueue
(
pVnode
,
handle
);
pRet
->
qhandle
=
handle
;
pRet
->
qhandle
=
handle
;
code
=
TSDB_CODE_
VND_ACTION_NEED_REPROCESSED
;
code
=
TSDB_CODE_
SUCCESS
;
}
else
{
// no further execution invoked, release the ref to vnode
}
else
{
// no further execution invoked, release the ref to vnode
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
}
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录