Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9c1fcd5e
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9c1fcd5e
编写于
7月 02, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 02, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2526 from taosdata/feature/query
Feature/query
上级
2b7f00e7
a9ef9ff9
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
276 addition
and
199 deletion
+276
-199
src/client/src/tscServer.c
src/client/src/tscServer.c
+8
-8
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+1
-1
src/inc/query.h
src/inc/query.h
+4
-7
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+8
-8
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+5
-5
src/plugins/http/src/httpContext.c
src/plugins/http/src/httpContext.c
+5
-5
src/plugins/http/src/httpSession.c
src/plugins/http/src/httpSession.c
+5
-5
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+11
-9
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+22
-17
src/util/inc/tcache.h
src/util/inc/tcache.h
+27
-21
src/util/src/tcache.c
src/util/src/tcache.c
+95
-65
src/util/tests/cacheTest.cpp
src/util/tests/cacheTest.cpp
+14
-14
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+23
-7
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+47
-27
未找到文件。
src/client/src/tscServer.c
浏览文件 @
9c1fcd5e
...
...
@@ -1675,8 +1675,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
);
assert
(
pTableMetaInfo
->
pTableMeta
==
NULL
);
pTableMetaInfo
->
pTableMeta
=
(
STableMeta
*
)
taosCachePut
(
tscCacheHandle
,
pTableMetaInfo
->
name
,
pTableMeta
,
size
,
tsTableMetaKeepTimer
);
pTableMetaInfo
->
pTableMeta
=
(
STableMeta
*
)
taosCachePut
(
tscCacheHandle
,
pTableMetaInfo
->
name
,
strlen
(
pTableMetaInfo
->
name
)
,
pTableMeta
,
size
,
tsTableMetaKeepTimer
);
// todo handle out of memory case
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
)
{
...
...
@@ -1879,7 +1879,8 @@ int tscProcessShowRsp(SSqlObj *pSql) {
size_t
size
=
0
;
STableMeta
*
pTableMeta
=
tscCreateTableMetaFromMsg
(
pMetaMsg
,
&
size
);
pTableMetaInfo
->
pTableMeta
=
taosCachePut
(
tscCacheHandle
,
key
,
(
char
*
)
pTableMeta
,
size
,
tsTableMetaKeepTimer
);
pTableMetaInfo
->
pTableMeta
=
taosCachePut
(
tscCacheHandle
,
key
,
strlen
(
key
),
(
char
*
)
pTableMeta
,
size
,
tsTableMetaKeepTimer
);
SSchema
*
pTableSchema
=
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
);
if
(
pQueryInfo
->
colList
==
NULL
)
{
...
...
@@ -1949,9 +1950,8 @@ int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
int
tscProcessDropTableRsp
(
SSqlObj
*
pSql
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
);
STableMeta
*
pTableMeta
=
taosCacheAcquireByName
(
tscCacheHandle
,
pTableMetaInfo
->
name
);
if
(
pTableMeta
==
NULL
)
{
/* not in cache, abort */
STableMeta
*
pTableMeta
=
taosCacheAcquireByKey
(
tscCacheHandle
,
pTableMetaInfo
->
name
,
strlen
(
pTableMetaInfo
->
name
));
if
(
pTableMeta
==
NULL
)
{
/* not in cache, abort */
return
0
;
}
...
...
@@ -1975,7 +1975,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
int
tscProcessAlterTableMsgRsp
(
SSqlObj
*
pSql
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
);
STableMeta
*
pTableMeta
=
taosCacheAcquireBy
Name
(
tscCacheHandle
,
pTableMetaInfo
->
name
);
STableMeta
*
pTableMeta
=
taosCacheAcquireBy
Key
(
tscCacheHandle
,
pTableMetaInfo
->
name
,
strlen
(
pTableMetaInfo
->
name
)
);
if
(
pTableMeta
==
NULL
)
{
/* not in cache, abort */
return
0
;
}
...
...
@@ -2125,7 +2125,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
false
);
}
pTableMetaInfo
->
pTableMeta
=
(
STableMeta
*
)
taosCacheAcquireBy
Name
(
tscCacheHandle
,
pTableMetaInfo
->
name
);
pTableMetaInfo
->
pTableMeta
=
(
STableMeta
*
)
taosCacheAcquireBy
Key
(
tscCacheHandle
,
pTableMetaInfo
->
name
,
strlen
(
pTableMetaInfo
->
name
)
);
if
(
pTableMetaInfo
->
pTableMeta
!=
NULL
)
{
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
tscDebug
(
"%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p"
,
pSql
,
tinfo
.
numOfColumns
,
...
...
src/client/src/tscSystem.c
浏览文件 @
9c1fcd5e
...
...
@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime
=
refreshTime
<
10
?
10
:
refreshTime
;
if
(
tscCacheHandle
==
NULL
)
{
tscCacheHandle
=
taosCacheInit
(
refreshTime
);
tscCacheHandle
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
refreshTime
,
false
,
NULL
);
}
tscDebug
(
"client is initialized successfully"
);
...
...
src/inc/query.h
浏览文件 @
9c1fcd5e
...
...
@@ -20,6 +20,7 @@ extern "C" {
#endif
typedef
void
*
qinfo_t
;
typedef
void
(
*
_qinfo_free_fn_t
)(
void
*
);
/**
* create the qinfo object according to QueryTableMsg
...
...
@@ -28,15 +29,13 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryTableMsg
,
qinfo_t
*
qinfo
);
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryTableMsg
,
void
*
param
,
_qinfo_free_fn_t
fn
,
qinfo_t
*
qinfo
);
/**
* Destroy QInfo object
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
*/
void
qDestroyQueryInfo
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
);
void
qDestroyQueryInfo
(
qinfo_t
qinfo
);
/**
* the main query execution function, including query on both table and multitables,
...
...
@@ -81,11 +80,9 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
* @return
*/
int32_t
qKillQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
);
int32_t
qKillQuery
(
qinfo_t
qinfo
);
#ifdef __cplusplus
}
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
9c1fcd5e
...
...
@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_STREAM
,
mnodeProcessKillStreamMsg
);
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_CONN
,
mnodeProcessKillConnectionMsg
);
tsMnodeConnCache
=
taosCacheInitWithCb
(
CONN_CHECK_TIME
,
mnodeFreeConn
);
tsMnodeConnCache
=
taosCacheInitWithCb
(
TSDB_DATA_TYPE_BINARY
,
CONN_CHECK_TIME
,
false
,
mnodeFreeConn
);
return
0
;
}
...
...
@@ -101,8 +101,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
tstrncpy
(
connObj
.
user
,
user
,
sizeof
(
connObj
.
user
));
char
key
[
10
];
sprintf
(
key
,
"%u"
,
connId
);
SConnObj
*
pConn
=
taosCachePut
(
tsMnodeConnCache
,
key
,
&
connObj
,
sizeof
(
connObj
),
CONN_KEEP_TIME
);
int32_t
len
=
sprintf
(
key
,
"%u"
,
connId
);
SConnObj
*
pConn
=
taosCachePut
(
tsMnodeConnCache
,
key
,
len
,
&
connObj
,
sizeof
(
connObj
),
CONN_KEEP_TIME
);
mDebug
(
"connId:%d, is created, user:%s ip:%s:%u"
,
connId
,
user
,
taosIpStr
(
ip
),
port
);
return
pConn
;
...
...
@@ -115,10 +115,10 @@ void mnodeReleaseConn(SConnObj *pConn) {
SConnObj
*
mnodeAccquireConn
(
uint32_t
connId
,
char
*
user
,
uint32_t
ip
,
uint16_t
port
)
{
char
key
[
10
];
sprintf
(
key
,
"%u"
,
connId
);
int32_t
len
=
sprintf
(
key
,
"%u"
,
connId
);
uint64_t
expireTime
=
CONN_KEEP_TIME
*
1000
+
(
uint64_t
)
taosGetTimestampMs
();
SConnObj
*
pConn
=
taosCacheUpdateExpireTimeByName
(
tsMnodeConnCache
,
key
,
expireTime
);
SConnObj
*
pConn
=
taosCacheUpdateExpireTimeByName
(
tsMnodeConnCache
,
key
,
len
,
expireTime
);
if
(
pConn
==
NULL
)
{
mError
(
"connId:%d, is already destroyed, user:%s ip:%s:%u"
,
connId
,
user
,
taosIpStr
(
ip
),
port
);
return
NULL
;
...
...
@@ -547,7 +547,7 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
int32_t
queryId
=
(
int32_t
)
strtol
(
queryIdStr
,
NULL
,
10
);
SConnObj
*
pConn
=
taosCacheAcquireBy
Name
(
tsMnodeConnCache
,
connIdStr
);
SConnObj
*
pConn
=
taosCacheAcquireBy
Key
(
tsMnodeConnCache
,
connIdStr
,
strlen
(
connIdStr
)
);
if
(
pConn
==
NULL
)
{
mError
(
"connId:%s, failed to kill queryId:%d, conn not exist"
,
connIdStr
,
queryId
);
return
TSDB_CODE_MND_INVALID_CONN_ID
;
...
...
@@ -577,7 +577,7 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
int32_t
streamId
=
(
int32_t
)
strtol
(
streamIdStr
,
NULL
,
10
);
SConnObj
*
pConn
=
taosCacheAcquireBy
Name
(
tsMnodeConnCache
,
connIdStr
);
SConnObj
*
pConn
=
taosCacheAcquireBy
Key
(
tsMnodeConnCache
,
connIdStr
,
strlen
(
connIdStr
)
);
if
(
pConn
==
NULL
)
{
mError
(
"connId:%s, failed to kill streamId:%d, conn not exist"
,
connIdStr
,
streamId
);
return
TSDB_CODE_MND_INVALID_CONN_ID
;
...
...
@@ -594,7 +594,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
if
(
strcmp
(
pUser
->
user
,
TSDB_DEFAULT_USER
)
!=
0
)
return
TSDB_CODE_MND_NO_RIGHTS
;
SCMKillConnMsg
*
pKill
=
pMsg
->
rpcMsg
.
pCont
;
SConnObj
*
pConn
=
taosCacheAcquireBy
Name
(
tsMnodeConnCache
,
pKill
->
queryId
);
SConnObj
*
pConn
=
taosCacheAcquireBy
Key
(
tsMnodeConnCache
,
pKill
->
queryId
,
strlen
(
pKill
->
queryId
)
);
if
(
pConn
==
NULL
)
{
mError
(
"connId:%s, failed to kill, conn not exist"
,
pKill
->
queryId
);
return
TSDB_CODE_MND_INVALID_CONN_ID
;
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
9c1fcd5e
...
...
@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mnodeProcessConnectMsg
);
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mnodeProcessUseMsg
);
tsMnodeShowCache
=
taosCacheInitWithCb
(
5
,
mnodeFreeShowObj
);
tsMnodeShowCache
=
taosCacheInitWithCb
(
TSDB_DATA_TYPE_BINARY
,
5
,
false
,
mnodeFreeShowObj
);
return
0
;
}
...
...
@@ -365,9 +365,9 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
static
bool
mnodeAccquireShowObj
(
SShowObj
*
pShow
)
{
char
key
[
10
];
sprintf
(
key
,
"%d"
,
pShow
->
index
);
int32_t
len
=
sprintf
(
key
,
"%d"
,
pShow
->
index
);
SShowObj
*
pSaved
=
taosCacheAcquireBy
Name
(
tsMnodeShowCache
,
key
);
SShowObj
*
pSaved
=
taosCacheAcquireBy
Key
(
tsMnodeShowCache
,
key
,
len
);
if
(
pSaved
==
pShow
)
{
mDebug
(
"%p, show is accquired from cache"
,
pShow
);
return
true
;
...
...
@@ -380,9 +380,9 @@ static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
if
(
tsMnodeShowCache
!=
NULL
)
{
char
key
[
10
];
pShow
->
index
=
atomic_add_fetch_32
(
&
tsShowObjIndex
,
1
);
sprintf
(
key
,
"%d"
,
pShow
->
index
);
int32_t
len
=
sprintf
(
key
,
"%d"
,
pShow
->
index
);
SShowObj
*
newQhandle
=
taosCachePut
(
tsMnodeShowCache
,
key
,
pShow
,
size
,
6
);
SShowObj
*
newQhandle
=
taosCachePut
(
tsMnodeShowCache
,
key
,
len
,
pShow
,
size
,
6
);
free
(
pShow
);
mDebug
(
"%p, show is put into cache, index:%s"
,
newQhandle
,
key
);
...
...
src/plugins/http/src/httpContext.c
浏览文件 @
9c1fcd5e
...
...
@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
}
bool
httpInitContexts
()
{
tsHttpServer
.
contextCache
=
taosCacheInitWithCb
(
2
,
httpDestroyContext
);
tsHttpServer
.
contextCache
=
taosCacheInitWithCb
(
TSDB_DATA_TYPE_BINARY
,
2
,
false
,
httpDestroyContext
);
if
(
tsHttpServer
.
contextCache
==
NULL
)
{
httpError
(
"failed to init context cache"
);
return
false
;
...
...
@@ -104,14 +104,14 @@ HttpContext *httpCreateContext(int32_t fd) {
if
(
pContext
==
NULL
)
return
NULL
;
char
contextStr
[
16
]
=
{
0
};
snprintf
(
contextStr
,
sizeof
(
contextStr
),
"%p"
,
pContext
);
int32_t
keySize
=
snprintf
(
contextStr
,
sizeof
(
contextStr
),
"%p"
,
pContext
);
pContext
->
fd
=
fd
;
pContext
->
httpVersion
=
HTTP_VERSION_10
;
pContext
->
lastAccessTime
=
taosGetTimestampSec
();
pContext
->
state
=
HTTP_CONTEXT_STATE_READY
;
HttpContext
**
ppContext
=
taosCachePut
(
tsHttpServer
.
contextCache
,
contextStr
,
&
pContext
,
sizeof
(
HttpContext
*
),
3
);
HttpContext
**
ppContext
=
taosCachePut
(
tsHttpServer
.
contextCache
,
contextStr
,
keySize
,
&
pContext
,
sizeof
(
HttpContext
*
),
3
);
pContext
->
ppContext
=
ppContext
;
httpDebug
(
"context:%p, fd:%d, is created, item:%p"
,
pContext
,
fd
,
ppContext
);
...
...
@@ -123,9 +123,9 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext
*
httpGetContext
(
void
*
ptr
)
{
char
contextStr
[
16
]
=
{
0
};
snprintf
(
contextStr
,
sizeof
(
contextStr
),
"%p"
,
ptr
);
int32_t
len
=
snprintf
(
contextStr
,
sizeof
(
contextStr
),
"%p"
,
ptr
);
HttpContext
**
ppContext
=
taosCacheAcquireBy
Name
(
tsHttpServer
.
contextCache
,
contextStr
);
HttpContext
**
ppContext
=
taosCacheAcquireBy
Key
(
tsHttpServer
.
contextCache
,
contextStr
,
len
);
if
(
ppContext
)
{
HttpContext
*
pContext
=
*
ppContext
;
...
...
src/plugins/http/src/httpSession.c
浏览文件 @
9c1fcd5e
...
...
@@ -33,9 +33,9 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
memset
(
&
session
,
0
,
sizeof
(
HttpSession
));
session
.
taos
=
taos
;
session
.
refCount
=
1
;
snprintf
(
session
.
id
,
HTTP_SESSION_ID_LEN
,
"%s.%s"
,
pContext
->
user
,
pContext
->
pass
);
int32_t
len
=
snprintf
(
session
.
id
,
HTTP_SESSION_ID_LEN
,
"%s.%s"
,
pContext
->
user
,
pContext
->
pass
);
pContext
->
session
=
taosCachePut
(
server
->
sessionCache
,
session
.
id
,
&
session
,
sizeof
(
HttpSession
),
tsHttpSessionExpire
);
pContext
->
session
=
taosCachePut
(
server
->
sessionCache
,
session
.
id
,
len
,
&
session
,
sizeof
(
HttpSession
),
tsHttpSessionExpire
);
// void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
...
...
@@ -57,9 +57,9 @@ static void httpFetchSessionImp(HttpContext *pContext) {
pthread_mutex_lock
(
&
server
->
serverMutex
);
char
sessionId
[
HTTP_SESSION_ID_LEN
];
snprintf
(
sessionId
,
HTTP_SESSION_ID_LEN
,
"%s.%s"
,
pContext
->
user
,
pContext
->
pass
);
int32_t
len
=
snprintf
(
sessionId
,
HTTP_SESSION_ID_LEN
,
"%s.%s"
,
pContext
->
user
,
pContext
->
pass
);
pContext
->
session
=
taosCacheAcquireBy
Name
(
server
->
sessionCache
,
sessionId
);
pContext
->
session
=
taosCacheAcquireBy
Key
(
server
->
sessionCache
,
sessionId
,
len
);
if
(
pContext
->
session
!=
NULL
)
{
atomic_add_fetch_32
(
&
pContext
->
session
->
refCount
,
1
);
httpDebug
(
"context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d"
,
pContext
,
pContext
->
fd
,
...
...
@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
}
bool
httpInitSessions
()
{
tsHttpServer
.
sessionCache
=
taosCacheInitWithCb
(
5
,
httpDestroySession
);
tsHttpServer
.
sessionCache
=
taosCacheInitWithCb
(
TSDB_DATA_TYPE_BINARY
,
5
,
false
,
httpDestroySession
);
if
(
tsHttpServer
.
sessionCache
==
NULL
)
{
httpError
(
"failed to init session cache"
);
return
false
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
9c1fcd5e
...
...
@@ -27,6 +27,7 @@
#include "tref.h"
#include "tsdb.h"
#include "tsqlfunction.h"
#include "query.h"
struct
SColumnFilterElem
;
typedef
bool
(
*
__filter_func_t
)(
struct
SColumnFilterElem
*
pFilter
,
char
*
val1
,
char
*
val2
);
...
...
@@ -181,13 +182,13 @@ typedef struct SQueryRuntimeEnv {
}
SQueryRuntimeEnv
;
typedef
struct
SQInfo
{
void
*
signature
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
void
*
tsdb
;
int32_t
vgId
;
void
*
signature
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
void
*
tsdb
;
void
*
param
;
int32_t
vgId
;
STableGroupInfo
tableGroupInfo
;
// table id list < only includes the STable list>
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv
runtimeEnv
;
...
...
@@ -202,8 +203,9 @@ typedef struct SQInfo {
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
_qinfo_free_fn_t
fn
;
}
SQInfo
;
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/src/qExecutor.c
浏览文件 @
9c1fcd5e
...
...
@@ -4069,7 +4069,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return
pFillCol
;
}
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
void
*
param
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
)
{
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
STSBuf
*
pTsBuf
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
,
void
*
freeParam
,
_qinfo_free_fn_t
fn
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
@@ -4083,14 +4083,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
pQInfo
->
param
=
freeParam
;
pQInfo
->
fn
=
fn
;
pRuntimeEnv
->
pQuery
=
pQuery
;
pRuntimeEnv
->
pTSBuf
=
p
aram
;
pRuntimeEnv
->
pTSBuf
=
p
TsBuf
;
pRuntimeEnv
->
cur
.
vgroupIndex
=
-
1
;
pRuntimeEnv
->
stableQuery
=
isSTableQuery
;
pRuntimeEnv
->
prevGroupId
=
INT32_MIN
;
if
(
p
aram
!=
NULL
)
{
if
(
p
TsBuf
!=
NULL
)
{
int16_t
order
=
(
pQuery
->
order
.
order
==
pRuntimeEnv
->
pTSBuf
->
tsOrder
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
tsBufSetTraverseOrder
(
pRuntimeEnv
->
pTSBuf
,
order
);
}
...
...
@@ -5697,8 +5699,7 @@ static bool isValidQInfo(void *param) {
return
(
sig
==
(
uint64_t
)
pQInfo
);
}
static
int32_t
initQInfo
(
SQueryTableMsg
*
pQueryMsg
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
bool
isSTable
)
{
static
int32_t
initQInfo
(
SQueryTableMsg
*
pQueryMsg
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
bool
isSTable
,
void
*
param
,
_qinfo_free_fn_t
fn
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
...
...
@@ -5731,7 +5732,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
}
// filter the qualified
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
,
param
,
fn
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -5894,7 +5895,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryMsg
,
qinfo_t
*
pQInfo
)
{
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryMsg
,
void
*
param
,
_qinfo_free_fn_t
fn
,
qinfo_t
*
pQInfo
)
{
assert
(
pQueryMsg
!=
NULL
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -5984,7 +5986,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
goto
_over
;
}
code
=
initQInfo
(
pQueryMsg
,
tsdb
,
vgId
,
*
pQInfo
,
isSTableQuery
);
code
=
initQInfo
(
pQueryMsg
,
tsdb
,
vgId
,
*
pQInfo
,
isSTableQuery
,
param
,
fn
);
_over:
free
(
tagCond
);
...
...
@@ -6020,7 +6022,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) {
freeQInfo
(
pQInfo
);
}
void
qDestroyQueryInfo
(
qinfo_t
qHandle
,
void
(
*
fp
)(
void
*
),
void
*
param
)
{
void
qDestroyQueryInfo
(
qinfo_t
qHandle
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qHandle
;
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
...
...
@@ -6030,11 +6032,15 @@ void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) {
qDebug
(
"QInfo:%p dec refCount, value:%d"
,
pQInfo
,
ref
);
if
(
ref
==
0
)
{
doDestoryQueryInfo
(
pQInfo
);
_qinfo_free_fn_t
fn
=
pQInfo
->
fn
;
void
*
param
=
pQInfo
->
param
;
if
(
fp
!=
NULL
)
{
fp
(
param
);
doDestoryQueryInfo
(
pQInfo
);
if
(
fn
!=
NULL
)
{
assert
(
param
!=
NULL
);
fn
(
param
);
}
}
}
...
...
@@ -6048,7 +6054,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
qDestroyQueryInfo
(
pQInfo
,
fp
,
param
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
...
...
@@ -6069,7 +6075,7 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
}
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
,
fp
,
param
);
qDestroyQueryInfo
(
pQInfo
);
}
int32_t
qRetrieveQueryResultInfo
(
qinfo_t
qinfo
)
{
...
...
@@ -6162,7 +6168,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return
code
;
}
int32_t
qKillQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
)
{
int32_t
qKillQuery
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
...
...
@@ -6170,8 +6176,7 @@ int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
}
setQueryKilled
(
pQInfo
);
qDestroyQueryInfo
(
pQInfo
,
fp
,
param
);
qDestroyQueryInfo
(
pQInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/util/inc/tcache.h
浏览文件 @
9c1fcd5e
...
...
@@ -24,6 +24,8 @@ extern "C" {
#include "tref.h"
#include "hash.h"
typedef
void
(
*
__cache_freeres_fn_t
)(
void
*
);
typedef
struct
SCacheStatis
{
int64_t
missCount
;
int64_t
hitCount
;
...
...
@@ -34,14 +36,15 @@ typedef struct SCacheStatis {
typedef
struct
SCacheDataNode
{
uint64_t
addedTime
;
// the added time when this element is added or updated into cache
uint64_t
expiredTime
;
// expiredTime expiredTime when this element should be remove from cache
uint64_t
lifespan
;
// expiredTime expiredTime when this element should be remove from cache
uint64_t
signature
;
uint32_t
size
;
// allocated size for current SCacheDataNode
uint16_t
keySize
:
15
;
bool
inTrashCan
:
1
;
// denote if it is in trash or not
T_REF_DECLARE
()
char
*
key
;
char
data
[];
uint16_t
keySize
:
15
;
// max key size: 32kb
bool
inTrashCan
:
1
;
// denote if it is in trash or not
int32_t
extendFactor
;
// number of life span extend
char
*
key
;
char
data
[];
}
SCacheDataNode
;
typedef
struct
STrashElem
{
...
...
@@ -62,29 +65,32 @@ typedef struct {
int64_t
totalSize
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
STrashElem
*
pTrash
;
void
*
tmrCtrl
;
void
*
pTimer
;
//
void * tmrCtrl;
//
void * pTimer;
SCacheStatis
statistics
;
SHashObj
*
pHashTable
;
_
hash_free
_fn_t
freeFp
;
_
_cache_freeres
_fn_t
freeFp
;
uint32_t
numOfElemsInTrash
;
// number of element in trash
uint8_t
deleting
;
// set the deleting flag to stop refreshing ASAP.
pthread_t
refreshWorker
;
bool
extendLifespan
;
// auto extend life span when one item is accessed.
#if defined(LINUX)
pthread_rwlock_t
lock
;
#else
pthread_mutex_t
lock
;
pthread_mutex_t
lock
;
#endif
}
SCacheObj
;
/**
* initialize the cache object
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
* not referenced by other objects
* @param keyType key type
* @param refreshTimeInSeconds refresh operation interval time, the maximum survival time when one element is expired
* and not referenced by other objects
* @param extendLifespan auto extend lifespan, if accessed
* @param fn free resource callback function
* @return
*/
SCacheObj
*
taosCacheInit
(
int
64_t
refreshTimeInSeconds
);
SCacheObj
*
taosCacheInit
(
int
32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
);
/**
* initialize the cache object and set the free object callback function
...
...
@@ -92,7 +98,7 @@ SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds);
* @param freeCb
* @return
*/
SCacheObj
*
taosCacheInitWithCb
(
int
64_t
refreshTimeInSeconds
,
void
(
*
freeCb
)(
void
*
data
)
);
SCacheObj
*
taosCacheInitWithCb
(
int
32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
);
/**
* add data into cache
...
...
@@ -104,7 +110,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void
* @param keepTime survival time in second
* @return cached element
*/
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
const
void
*
pData
,
size_t
dataSize
,
int
keepTimeInSeconds
);
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dataSize
,
int
keepTimeInSeconds
);
/**
* get data from cache
...
...
@@ -112,22 +118,23 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
* @param key key
* @return cached data or NULL
*/
void
*
taosCacheAcquireBy
Name
(
SCacheObj
*
pCacheObj
,
const
char
*
key
);
void
*
taosCacheAcquireBy
Key
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
);
/**
* update the expire time of data in cache
* @param pCacheObj cache object
* @param key key
* @param keyLen keyLen
* @param expireTime new expire time of data
* @return
*/
void
*
taosCacheUpdateExpireTimeByName
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
uint64_t
expireTime
);
void
*
taosCacheUpdateExpireTimeByName
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
size_t
keyLen
,
uint64_t
expireTime
);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
* This procedure is a faster version of taosCacheAcquireBy
Name
function, which avoids the sideeffect of the problem of
* the data is moved to trash, and taosCacheAcquireBy
Name
will fail to retrieve it again.
* This procedure is a faster version of taosCacheAcquireBy
Key
function, which avoids the sideeffect of the problem of
* the data is moved to trash, and taosCacheAcquireBy
Key
will fail to retrieve it again.
*
* @param handle
* @param data
...
...
@@ -148,8 +155,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data);
* if it is referenced by other object, it will be remain in cache
* @param handle cache object
* @param data not the key, actually referenced data
* @param _remove force model, reduce the ref count and move the data into
* pTrash
* @param _remove force model, reduce the ref count and move the data into pTrash
*/
void
taosCacheRelease
(
SCacheObj
*
pCacheObj
,
void
**
data
,
bool
_remove
);
...
...
src/util/src/tcache.c
浏览文件 @
9c1fcd5e
...
...
@@ -119,7 +119,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t
size
=
pNode
->
size
;
taosHashRemove
(
pCacheObj
->
pHashTable
,
pNode
->
key
,
pNode
->
keySize
);
uDebug
(
"key:%
s, is removed from cache, total:%"
PRId64
" size:%d bytes"
,
pNode
->
key
,
pCacheObj
->
totalSize
,
size
);
uDebug
(
"key:%
p, is removed from cache,total:%"
PRId64
",size:%dbytes"
,
pNode
->
key
,
pCacheObj
->
totalSize
,
size
);
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pNode
->
data
);
free
(
pNode
);
}
...
...
@@ -167,7 +167,7 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode
// update the timestamp information for updated key/value
pNewNode
->
addedTime
=
taosGetTimestampMs
();
pNewNode
->
expiredTime
=
pNewNode
->
addedTime
+
duration
;
pNewNode
->
lifespan
=
duration
;
T_REF_INC
(
pNewNode
);
...
...
@@ -224,8 +224,8 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/
static
void
*
taosCacheRefresh
(
void
*
handle
);
SCacheObj
*
taosCacheInitWithCb
(
int
64_t
refreshTime
,
void
(
*
freeCb
)(
void
*
data
)
)
{
if
(
refreshTime
<=
0
)
{
SCacheObj
*
taosCacheInitWithCb
(
int
32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
)
{
if
(
refreshTime
InSeconds
<=
0
)
{
return
NULL
;
}
...
...
@@ -235,7 +235,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
return
NULL
;
}
pCacheObj
->
pHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
);
pCacheObj
->
pHashTable
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
keyType
),
false
);
if
(
pCacheObj
->
pHashTable
==
NULL
)
{
free
(
pCacheObj
);
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
...
...
@@ -243,10 +243,9 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
}
// set free cache node callback function for hash table
// taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pCacheObj
->
freeFp
=
freeCb
;
pCacheObj
->
refreshTime
=
refreshTime
*
1000
;
pCacheObj
->
freeFp
=
fn
;
pCacheObj
->
refreshTime
=
refreshTimeInSeconds
*
1000
;
pCacheObj
->
extendLifespan
=
extendLifespan
;
if
(
__cache_lock_init
(
pCacheObj
)
!=
0
)
{
taosHashCleanup
(
pCacheObj
->
pHashTable
);
...
...
@@ -256,7 +255,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
return
NULL
;
}
pthread_attr_t
thattr
;
pthread_attr_t
thattr
=
{{
0
}}
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
...
...
@@ -266,19 +265,17 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
return
pCacheObj
;
}
SCacheObj
*
taosCacheInit
(
int
64_t
refreshTime
)
{
return
taosCacheInitWithCb
(
refreshTime
,
NULL
);
SCacheObj
*
taosCacheInit
(
int
32_t
keyType
,
int64_t
refreshTimeInSeconds
,
bool
extendLifespan
,
__cache_freeres_fn_t
fn
)
{
return
taosCacheInitWithCb
(
keyType
,
refreshTimeInSeconds
,
extendLifespan
,
fn
);
}
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
const
void
*
pData
,
size_t
dataSize
,
int
duration
)
{
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
,
const
void
*
pData
,
size_t
dataSize
,
int
duration
)
{
SCacheDataNode
*
pNode
;
if
(
pCacheObj
==
NULL
||
pCacheObj
->
pHashTable
==
NULL
)
{
return
NULL
;
}
size_t
keyLen
=
strlen
(
key
);
__cache_wr_lock
(
pCacheObj
);
SCacheDataNode
**
pt
=
(
SCacheDataNode
**
)
taosHashGet
(
pCacheObj
->
pHashTable
,
key
,
keyLen
);
SCacheDataNode
*
pOld
=
(
pt
!=
NULL
)
?
(
*
pt
)
:
NULL
;
...
...
@@ -288,14 +285,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
if
(
NULL
!=
pNode
)
{
pCacheObj
->
totalSize
+=
pNode
->
size
;
uDebug
(
"key:%
s
, %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", total:%"
PRId64
", size:%"
PRId64
" bytes"
,
key
,
pNode
,
pNode
->
addedTime
,
pNode
->
expiredTime
,
pCacheObj
->
totalSize
,
dataSize
);
uDebug
(
"key:%
p
, %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", total:%"
PRId64
", size:%"
PRId64
" bytes"
,
key
,
pNode
,
pNode
->
addedTime
,
(
pNode
->
lifespan
*
pNode
->
extendFactor
+
pNode
->
addedTime
)
,
pCacheObj
->
totalSize
,
dataSize
);
}
else
{
uError
(
"key:%
s
, failed to added into cache, out of memory"
,
key
);
uError
(
"key:%
p
, failed to added into cache, out of memory"
,
key
);
}
}
else
{
// old data exists, update the node
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
uDebug
(
"key:%
s
, %p exist in cache, updated"
,
key
,
pNode
);
uDebug
(
"key:%
p
, %p exist in cache, updated"
,
key
,
pNode
);
}
__cache_unlock
(
pCacheObj
);
...
...
@@ -303,57 +300,65 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
return
(
pNode
!=
NULL
)
?
pNode
->
data
:
NULL
;
}
void
*
taosCacheAcquireBy
Name
(
SCacheObj
*
pCacheObj
,
const
char
*
key
)
{
void
*
taosCacheAcquireBy
Key
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
)
{
if
(
pCacheObj
==
NULL
||
taosHashGetSize
(
pCacheObj
->
pHashTable
)
==
0
)
{
return
NULL
;
}
uint32_t
keyLen
=
(
uint32_t
)
strlen
(
key
);
__cache_rd_lock
(
pCacheObj
);
SCacheDataNode
**
ptNode
=
(
SCacheDataNode
**
)
taosHashGet
(
pCacheObj
->
pHashTable
,
key
,
keyLen
);
int32_t
ref
=
0
;
if
(
ptNode
!=
NULL
)
{
T_REF_INC
(
*
ptNode
);
ref
=
T_REF_INC
(
*
ptNode
);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if
(
pCacheObj
->
extendLifespan
)
{
int64_t
now
=
taosGetTimestampMs
();
if
((
now
-
(
*
ptNode
)
->
addedTime
)
<
(
*
ptNode
)
->
lifespan
*
(
*
ptNode
)
->
extendFactor
)
{
(
*
ptNode
)
->
extendFactor
+=
1
;
uDebug
(
"key:%p extend life time to %"
PRId64
,
key
,
(
*
ptNode
)
->
lifespan
*
(
*
ptNode
)
->
extendFactor
+
(
*
ptNode
)
->
addedTime
);
}
}
}
__cache_unlock
(
pCacheObj
);
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"key:%
s, is retrieved from cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
)
);
uDebug
(
"key:%
p, is retrieved from cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
ref
);
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"key:%
s
, not in cache, retrieved failed"
,
key
);
uDebug
(
"key:%
p
, not in cache, retrieved failed"
,
key
);
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
return
(
ptNode
!=
NULL
)
?
(
*
ptNode
)
->
data
:
NULL
;
}
void
*
taosCacheUpdateExpireTimeByName
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
uint64_t
expireTime
)
{
void
*
taosCacheUpdateExpireTimeByName
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
size_t
keyLen
,
uint64_t
expireTime
)
{
if
(
pCacheObj
==
NULL
||
taosHashGetSize
(
pCacheObj
->
pHashTable
)
==
0
)
{
return
NULL
;
}
uint32_t
keyLen
=
(
uint32_t
)
strlen
(
key
);
__cache_rd_lock
(
pCacheObj
);
SCacheDataNode
**
ptNode
=
(
SCacheDataNode
**
)
taosHashGet
(
pCacheObj
->
pHashTable
,
key
,
keyLen
);
if
(
ptNode
!=
NULL
)
{
T_REF_INC
(
*
ptNode
);
(
*
ptNode
)
->
expiredTime
=
expireTime
;
(
*
ptNode
)
->
extendFactor
+=
1
;
// (*ptNode)->lifespan = expireTime;
}
__cache_unlock
(
pCacheObj
);
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"key:%
s
, expireTime is updated in cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
uDebug
(
"key:%
p
, expireTime is updated in cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"key:%
s
, not in cache, retrieved failed"
,
key
);
uDebug
(
"key:%
p
, not in cache, retrieved failed"
,
key
);
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
...
...
@@ -373,7 +378,17 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
int32_t
ref
=
T_REF_INC
(
ptNode
);
uDebug
(
"%p acquired by data in cache, refcnt:%d"
,
ptNode
,
ref
)
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if
(
pCacheObj
->
extendLifespan
)
{
int64_t
now
=
taosGetTimestampMs
();
if
((
now
-
ptNode
->
addedTime
)
<
ptNode
->
lifespan
*
ptNode
->
extendFactor
)
{
ptNode
->
extendFactor
+=
1
;
uDebug
(
"key:%p extend life time to %"
PRId64
,
ptNode
,
ptNode
->
lifespan
*
ptNode
->
extendFactor
+
ptNode
->
addedTime
);
}
}
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert
(
ref
>=
2
);
return
data
;
...
...
@@ -408,21 +423,27 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
size_t
offset
=
offsetof
(
SCacheDataNode
,
data
);
SCacheDataNode
*
pNode
=
(
SCacheDataNode
*
)((
char
*
)(
*
data
)
-
offset
);
if
(
pNode
->
signature
!=
(
uint64_t
)
pNode
)
{
uError
(
"
%p
release invalid cache data"
,
pNode
);
uError
(
"
key:%p,
release invalid cache data"
,
pNode
);
return
;
}
*
data
=
NULL
;
int
32
_t
ref
=
T_REF_DEC
(
pNode
);
uDebug
(
"
key:%s, is released, %p refcnt:%d"
,
pNode
->
key
,
pNode
,
ref
);
int
16
_t
ref
=
T_REF_DEC
(
pNode
);
uDebug
(
"
%p data released, refcnt:%d"
,
pNode
,
ref
);
if
(
_remove
)
{
if
(
_remove
&&
(
!
pNode
->
inTrashCan
)
)
{
__cache_wr_lock
(
pCacheObj
);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
if
(
T_REF_VAL_GET
(
pNode
)
==
0
)
{
// remove directly, if not referenced by other users
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
}
__cache_unlock
(
pCacheObj
);
}
}
...
...
@@ -473,11 +494,11 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
expiredTime
=
pNewNode
->
addedTime
+
duration
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
totalSize
;
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
lifespan
=
duration
;
pNewNode
->
extendFactor
=
1
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
totalSize
;
return
pNewNode
;
}
...
...
@@ -501,7 +522,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pNode
->
inTrashCan
=
true
;
pCacheObj
->
numOfElemsInTrash
++
;
uDebug
(
"key:%
s
, %p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
uDebug
(
"key:%
p
, %p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
}
void
taosRemoveFromTrashCan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
...
...
@@ -522,7 +543,11 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
}
pElem
->
pData
->
signature
=
0
;
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
if
(
pCacheObj
->
freeFp
)
{
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
}
uError
(
"-------------------free obj:%p"
,
pElem
->
pData
);
free
(
pElem
->
pData
);
free
(
pElem
);
}
...
...
@@ -549,7 +574,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
}
if
(
force
||
(
T_REF_VAL_GET
(
pElem
->
pData
)
==
0
))
{
uDebug
(
"key:%
s
, %p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
uDebug
(
"key:%
p
, %p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
pCacheObj
->
numOfElemsInTrash
-
1
);
STrashElem
*
p
=
pElem
;
...
...
@@ -573,7 +598,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
if
(
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
uDebug
(
"key:%
s, will not remove from cache, refcnt:%d"
,
pNode
->
key
,
T_REF_VAL_GET
(
pNode
));
uDebug
(
"key:%
p, will not remove from cache, refcnt:%d"
,
pNode
->
key
,
T_REF_VAL_GET
(
pNode
));
}
}
taosHashDestroyIter
(
pIter
);
...
...
@@ -613,27 +638,32 @@ void* taosCacheRefresh(void *handle) {
// reset the count value
count
=
0
;
size_t
num
=
taosHashGetSize
(
pCacheObj
->
pHashTable
);
if
(
num
==
0
)
{
size_t
elemInHash
=
taosHashGetSize
(
pCacheObj
->
pHashTable
);
if
(
elemInHash
+
pCacheObj
->
numOfElemsInTrash
==
0
)
{
continue
;
}
uint64_t
expiredTime
=
taosGetTimestampMs
();
pCacheObj
->
statistics
.
refreshCount
++
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
// refresh data in hash table
if
(
elemInHash
>
0
)
{
int64_t
expiredTime
=
taosGetTimestampMs
();
__cache_wr_lock
(
pCacheObj
);
while
(
taosHashIterNext
(
pIter
))
{
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
if
(
pNode
->
expiredTime
<=
expiredTime
&&
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
__cache_wr_lock
(
pCacheObj
);
while
(
taosHashIterNext
(
pIter
))
{
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
if
((
pNode
->
addedTime
+
pNode
->
lifespan
*
pNode
->
extendFactor
)
<=
expiredTime
&&
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
}
}
__cache_unlock
(
pCacheObj
);
__cache_unlock
(
pCacheObj
);
taosHashDestroyIter
(
pIter
);
}
taosHashDestroyIter
(
pIter
);
taosTrashCanEmpty
(
pCacheObj
,
false
);
}
...
...
src/util/tests/cacheTest.cpp
浏览文件 @
9c1fcd5e
...
...
@@ -19,12 +19,12 @@ int32_t tsMaxMeterConnections = 200;
// test cache
TEST
(
testCase
,
client_cache_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
SCacheObj
*
tscCacheHandle
=
taosCacheInit
(
REFRESH_TIME_IN_SEC
);
SCacheObj
*
tscCacheHandle
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
,
0
,
NULL
);
const
char
*
key1
=
"test1"
;
char
data1
[]
=
"test11"
;
char
*
cachedObj
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key1
,
data1
,
strlen
(
data1
)
+
1
,
1
);
char
*
cachedObj
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key1
,
strlen
(
key1
),
data1
,
strlen
(
data1
)
+
1
,
1
);
sleep
(
REFRESH_TIME_IN_SEC
+
1
);
printf
(
"obj is still valid: %s
\n
"
,
cachedObj
);
...
...
@@ -33,7 +33,7 @@ TEST(testCase, client_cache_test) {
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cachedObj
,
false
);
/* the object is cleared by cache clean operation */
cachedObj
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key1
,
data2
,
strlen
(
data2
)
+
1
,
20
);
cachedObj
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key1
,
strlen
(
key1
),
data2
,
strlen
(
data2
)
+
1
,
20
);
printf
(
"after updated: %s
\n
"
,
cachedObj
);
printf
(
"start to remove data from cache
\n
"
);
...
...
@@ -43,32 +43,32 @@ TEST(testCase, client_cache_test) {
const
char
*
key3
=
"test2"
;
const
char
*
data3
=
"kkkkkkk"
;
char
*
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key3
,
data3
,
strlen
(
data3
)
+
1
,
1
);
char
*
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key3
,
strlen
(
key3
),
data3
,
strlen
(
data3
)
+
1
,
1
);
printf
(
"%s
\n
"
,
cachedObj2
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cachedObj2
,
false
);
sleep
(
3
);
char
*
d
=
(
char
*
)
taosCacheAcquireBy
Name
(
tscCacheHandle
,
key3
);
char
*
d
=
(
char
*
)
taosCacheAcquireBy
Key
(
tscCacheHandle
,
key3
,
strlen
(
key3
)
);
// assert(d == NULL);
char
key5
[]
=
"test5"
;
char
data5
[]
=
"data5kkkkk"
;
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
data5
,
strlen
(
data5
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
strlen
(
key5
),
data5
,
strlen
(
data5
)
+
1
,
20
);
const
char
*
data6
=
"new Data after updated"
;
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cachedObj2
,
false
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
data6
,
strlen
(
data6
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
strlen
(
key5
),
data6
,
strlen
(
data6
)
+
1
,
20
);
printf
(
"%s
\n
"
,
cachedObj2
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cachedObj2
,
true
);
const
char
*
data7
=
"add call update procedure"
;
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
data7
,
strlen
(
data7
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key5
,
strlen
(
key5
),
data7
,
strlen
(
data7
)
+
1
,
20
);
printf
(
"%s
\n
=======================================
\n\n
"
,
cachedObj2
);
char
*
cc
=
(
char
*
)
taosCacheAcquireBy
Name
(
tscCacheHandle
,
key5
);
char
*
cc
=
(
char
*
)
taosCacheAcquireBy
Key
(
tscCacheHandle
,
key5
,
strlen
(
key5
)
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cachedObj2
,
true
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
cc
,
false
);
...
...
@@ -76,7 +76,7 @@ TEST(testCase, client_cache_test) {
const
char
*
data8
=
"ttft"
;
const
char
*
key6
=
"key6"
;
char
*
ft
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key6
,
data8
,
strlen
(
data8
),
20
);
char
*
ft
=
(
char
*
)
taosCachePut
(
tscCacheHandle
,
key6
,
strlen
(
key6
),
data8
,
strlen
(
data8
),
20
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
ft
,
false
);
/**
...
...
@@ -85,7 +85,7 @@ TEST(testCase, client_cache_test) {
uint64_t
startTime
=
taosGetTimestampUs
();
printf
(
"Cache Performance Test
\n
start time:%"
PRIu64
"
\n
"
,
startTime
);
for
(
int32_t
i
=
0
;
i
<
1000
;
++
i
)
{
char
*
dd
=
(
char
*
)
taosCacheAcquireBy
Name
(
tscCacheHandle
,
key6
);
char
*
dd
=
(
char
*
)
taosCacheAcquireBy
Key
(
tscCacheHandle
,
key6
,
strlen
(
key6
)
);
if
(
dd
!=
NULL
)
{
// printf("get the data\n");
}
else
{
...
...
@@ -105,7 +105,7 @@ TEST(testCase, client_cache_test) {
TEST
(
testCase
,
cache_resize_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
auto
*
pCache
=
taosCacheInit
(
REFRESH_TIME_IN_SEC
);
auto
*
pCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
,
false
,
NULL
);
char
key
[
256
]
=
{
0
};
char
data
[
1024
]
=
"abcdefghijk"
;
...
...
@@ -116,7 +116,7 @@ TEST(testCase, cache_resize_test) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
len
=
sprintf
(
key
,
"abc_%7d"
,
i
);
taosCachePut
(
pCache
,
key
,
data
,
len
,
3600
);
taosCachePut
(
pCache
,
key
,
strlen
(
key
),
data
,
len
,
3600
);
}
uint64_t
endTime
=
taosGetTimestampUs
();
...
...
@@ -125,7 +125,7 @@ TEST(testCase, cache_resize_test) {
startTime
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
len
=
sprintf
(
key
,
"abc_%7d"
,
i
);
void
*
k
=
taosCacheAcquireBy
Name
(
pCache
,
key
);
void
*
k
=
taosCacheAcquireBy
Key
(
pCache
,
key
,
len
);
assert
(
k
!=
0
);
}
endTime
=
taosGetTimestampUs
();
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
9c1fcd5e
...
...
@@ -53,6 +53,7 @@ typedef struct {
STsdbCfg
tsdbCfg
;
SSyncCfg
syncCfg
;
SWalCfg
walCfg
;
void
*
qHandlePool
;
// query handle pool
char
*
rootDir
;
char
db
[
TSDB_DB_NAME_LEN
];
}
SVnodeObj
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
9c1fcd5e
...
...
@@ -15,19 +15,22 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tcache.h"
#include "cJSON.h"
#include "dnode.h"
#include "hash.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "t
uti
l.h"
#include "t
globa
l.h"
#include "trpc.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "query.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
...
...
@@ -43,6 +46,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
void
vnodeFreeqHandle
(
void
*
phandle
);
static
pthread_once_t
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
...
...
@@ -279,6 +283,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if
(
pVnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
cqStart
(
pVnode
->
cq
);
const
int32_t
REFRESH_HANDLE_INTERVAL
=
2
;
// every 2 seconds, rfresh handle pool
pVnode
->
qHandlePool
=
taosCacheInit
(
TSDB_DATA_TYPE_BIGINT
,
REFRESH_HANDLE_INTERVAL
,
true
,
vnodeFreeqHandle
);
pVnode
->
events
=
NULL
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
vDebug
(
"vgId:%d, vnode is opened in %s, pVnode:%p"
,
pVnode
->
vgId
,
rootDir
,
pVnode
);
...
...
@@ -848,12 +855,12 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
goto
PARSE_OVER
;
}
cJSON
*
ver
sion
=
cJSON_GetObjectItem
(
root
,
"version"
);
if
(
!
ver
sion
||
version
->
type
!=
cJSON_Number
)
{
cJSON
*
ver
=
cJSON_GetObjectItem
(
root
,
"version"
);
if
(
!
ver
||
ver
->
type
!=
cJSON_Number
)
{
vError
(
"vgId:%d, failed to read vnode version, version not found"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
version
=
ver
sion
->
valueint
;
pVnode
->
version
=
ver
->
valueint
;
terrno
=
TSDB_CODE_SUCCESS
;
vInfo
(
"vgId:%d, read vnode version successfully, version:%"
PRId64
,
pVnode
->
vgId
,
pVnode
->
version
);
...
...
@@ -864,3 +871,12 @@ PARSE_OVER:
if
(
fp
)
fclose
(
fp
);
return
terrno
;
}
void
vnodeFreeqHandle
(
void
*
qHandle
)
{
void
**
handle
=
qHandle
;
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
;
}
qKillQuery
(
*
handle
);
}
\ No newline at end of file
src/vnode/src/vnodeRead.c
浏览文件 @
9c1fcd5e
...
...
@@ -15,20 +15,21 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "taoserror.h"
#include "tqueue.h"
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "query.h"
static
int32_t
(
*
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
);
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
);
static
int32_t
vnodeProcessFetchMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
);
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
);
void
vnodeInitReadFp
(
void
)
{
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_QUERY
]
=
vnodeProcessQueryMsg
;
...
...
@@ -58,19 +59,6 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pReadMsg
);
}
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
)
{
SRetrieveTableMsg
*
killQueryMsg
=
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
killQueryMsg
->
qhandle
=
htobe64
((
uint64_t
)
qhandle
);
killQueryMsg
->
free
=
htons
(
1
);
killQueryMsg
->
header
.
vgId
=
htonl
(
vgId
);
killQueryMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
vDebug
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
killQueryMsg
,
sizeof
(
SRetrieveTableMsg
));
}
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
)
{
void
*
pCont
=
pReadMsg
->
pCont
;
int32_t
contLen
=
pReadMsg
->
contLen
;
...
...
@@ -88,18 +76,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn
(
"QInfo:%p connection %p broken, kill query"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
assert
(
pReadMsg
->
rpcMsg
.
contLen
>
0
&&
killQueryMsg
->
free
==
1
);
// this message arrived here by means of the
query
message, so release the vnode is necessary
qKillQuery
((
qinfo_t
)
killQueryMsg
->
qhandle
,
vnodeRelease
,
pVnode
);
vnodeRelease
(
pVnode
);
// this message arrived here by means of the
*query*
message, so release the vnode is necessary
void
**
qhandle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
(
void
*
)
&
killQueryMsg
->
qhandle
,
sizeof
(
killQueryMsg
->
qhandle
)
);
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
// todo handle invalid qhandle error
}
else
{
// qKillQuery((qinfo_t) killQueryMsg->qhandle);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
qhandle
,
true
);
}
vnodeRelease
(
pVnode
);
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
// todo change the error code
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
qinfo_t
pQInfo
=
NULL
;
void
**
handle
=
NULL
;
if
(
contLen
!=
0
)
{
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
&
pQInfo
);
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
vnodeRelease
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
pQInfo
));
...
...
@@ -116,13 +111,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
// NOTE: there two refcount, needs to kill twice, todo refactor
qKillQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
qKillQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
// query has not been put into qhandle pool, kill it directly.
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
return
pRsp
->
code
;
}
vTrace
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
pVnode
->
vgId
,
pQInfo
);
handle
=
taosCachePut
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
),
&
pQInfo
,
sizeof
(
pQInfo
),
tsShellActivityTimer
*
2
);
assert
(
*
handle
==
pQInfo
);
}
else
{
assert
(
pQInfo
==
NULL
);
vnodeRelease
(
pVnode
);
...
...
@@ -138,6 +135,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
// do execute query
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
false
);
}
return
code
;
...
...
@@ -152,10 +150,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
ret
=
0
;
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
&
pQInfo
,
sizeof
(
pQInfo
));
if
(
handle
==
NULL
||
*
handle
!=
pQInfo
)
{
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
if
(
pRetrieve
->
free
==
1
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
int32_t
ret
=
qKillQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
handle
,
true
);
// int32_t ret = qKillQuery(pQInfo);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
...
...
@@ -184,10 +190,24 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet
->
qhandle
=
pQInfo
;
code
=
TSDB_CODE_VND_ACTION_NEED_REPROCESSED
;
}
else
{
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo
(
pQInfo
,
vnodeRelease
,
pVnode
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
// qDestroyQueryInfo(pQInfo);
}
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
pQInfo
);
return
code
;
}
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
)
{
SRetrieveTableMsg
*
killQueryMsg
=
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
killQueryMsg
->
qhandle
=
htobe64
((
uint64_t
)
qhandle
);
killQueryMsg
->
free
=
htons
(
1
);
killQueryMsg
->
header
.
vgId
=
htonl
(
vgId
);
killQueryMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
vDebug
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
killQueryMsg
,
sizeof
(
SRetrieveTableMsg
));
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录