Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce63ad05
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ce63ad05
编写于
4月 22, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11770 from taosdata/feature/tq
feat(tmq): add push mode
上级
78f05459
aade540f
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
432 addition
and
341 deletion
+432
-341
example/src/tmq.c
example/src/tmq.c
+0
-2
include/client/taos.h
include/client/taos.h
+2
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-4
include/os/osSemaphore.h
include/os/osSemaphore.h
+26
-19
include/util/tcache.h
include/util/tcache.h
+8
-8
source/client/src/tmq.c
source/client/src/tmq.c
+11
-1
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+0
-3
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+0
-4
source/dnode/mnode/impl/src/mndPerfSchema.c
source/dnode/mnode/impl/src/mndPerfSchema.c
+2
-0
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+13
-12
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+1
-1
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+2
-2
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+27
-46
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+169
-9
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+0
-70
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-15
source/os/src/osSemaphore.c
source/os/src/osSemaphore.c
+52
-26
source/util/src/tcache.c
source/util/src/tcache.c
+74
-76
source/util/test/cacheTest.cpp
source/util/test/cacheTest.cpp
+44
-42
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
example/src/tmq.c
浏览文件 @
ce63ad05
...
...
@@ -101,8 +101,6 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
include/client/taos.h
浏览文件 @
ce63ad05
...
...
@@ -222,6 +222,8 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
DLL_EXPORT
int32_t
tmq_list_get_size
(
const
tmq_list_t
*
);
DLL_EXPORT
char
**
tmq_list_to_c_array
(
const
tmq_list_t
*
);
#if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
...
...
include/common/tmsgdef.h
浏览文件 @
ce63ad05
...
...
@@ -175,11 +175,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_QUERY
,
"vnode-mq-query"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CONNECT
,
"vnode-mq-connect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_DISCONNECT
,
"vnode-mq-disconnect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CONN
,
"vnode-mq-set-conn"
,
SMqSetCVgReq
,
SMqSetCVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_REB
,
"vnode-mq-mv-rebalance"
,
SMqMVRebReq
,
SMqMVRebRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CANCEL_CONN
,
"vnode-mq-mv-cancel-conn"
,
SMqCancelConnReq
,
SMqCancelConnRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_CHANGE
,
"vnode-mq-vg-change"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CUR
,
"vnode-mq-set-cur"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_RES_READY
,
"vnode-res-ready"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASKS_STATUS
,
"vnode-tasks-status"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_TASK
,
"vnode-cancel-task"
,
NULL
,
NULL
)
...
...
include/os/osSemaphore.h
浏览文件 @
ce63ad05
...
...
@@ -22,21 +22,28 @@ extern "C" {
#include <semaphore.h>
#if defined (_TD_DARWIN_64)
typedef
struct
tsem_s
*
tsem_t
;
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
);
int
tsem_wait
(
tsem_t
*
sem
);
int
tsem_post
(
tsem_t
*
sem
);
int
tsem_destroy
(
tsem_t
*
sem
);
#if defined(_TD_DARWIN_64)
typedef
struct
tsem_s
*
tsem_t
;
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
);
int
tsem_wait
(
tsem_t
*
sem
);
int
tsem_timewait
(
tsem_t
*
sim
,
int64_t
nanosecs
);
int
tsem_post
(
tsem_t
*
sem
);
int
tsem_destroy
(
tsem_t
*
sem
);
#else
#define tsem_t sem_t
#define tsem_init sem_init
int
tsem_wait
(
tsem_t
*
sem
);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
#define tsem_t sem_t
#define tsem_init sem_init
int
tsem_wait
(
tsem_t
*
sem
);
int
tsem_timewait
(
tsem_t
*
sim
,
int64_t
nanosecs
);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
#endif
#if defined
(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// #define TdThreadRwlock TdThreadMutex
// #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL)
// #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock)
...
...
@@ -44,20 +51,20 @@ extern "C" {
// #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
#define TdThreadSpinlock
TdThreadMutex
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
#define taosThreadSpinDestroy(lock)
taosThreadMutexDestroy(lock)
#define taosThreadSpinLock(lock)
taosThreadMutexLock(lock)
#define taosThreadSpinUnlock(lock)
taosThreadMutexUnlock(lock)
#define TdThreadSpinlock
TdThreadMutex
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
#define taosThreadSpinDestroy(lock)
taosThreadMutexDestroy(lock)
#define taosThreadSpinLock(lock)
taosThreadMutexLock(lock)
#define taosThreadSpinUnlock(lock)
taosThreadMutexUnlock(lock)
#endif
bool
taosCheckPthreadValid
(
TdThread
thread
);
int64_t
taosGetSelfPthreadId
();
int64_t
taosGetPthreadId
(
TdThread
thread
);
void
taosResetPthread
(
TdThread
*
thread
);
void
taosResetPthread
(
TdThread
*
thread
);
bool
taosComparePthread
(
TdThread
first
,
TdThread
second
);
int32_t
taosGetPId
();
int32_t
taosGetAppName
(
char
*
name
,
int32_t
*
len
);
int32_t
taosGetAppName
(
char
*
name
,
int32_t
*
len
);
#ifdef __cplusplus
}
...
...
include/util/tcache.h
浏览文件 @
ce63ad05
...
...
@@ -47,13 +47,13 @@ typedef struct STrashElem STrashElem;
/**
* initialize the cache object
* @param keyType key type
* @param refreshTimeIn
Seconds
refresh operation interval time, the maximum survival time when one element is expired
* @param refreshTimeIn
Ms
refresh operation interval time, the maximum survival time when one element is expired
* and not referenced by other objects
* @param extendLifespan auto extend lifespan, if accessed
* @param fn free resource callback function
* @return
*/
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeIn
Second
s
,
bool
extendLifespan
,
__cache_free_fn_t
fn
,
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeIn
M
s
,
bool
extendLifespan
,
__cache_free_fn_t
fn
,
const
char
*
cacheName
);
/**
...
...
@@ -111,7 +111,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
* @param pCacheObj
* @return
*/
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
);
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
...
...
@@ -145,11 +145,11 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1);
*/
void
taosStopCacheRefreshWorker
();
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
);
bool
taosCacheIterNext
(
SCacheIter
*
pIter
);
void
*
taosCacheIterGetData
(
const
SCacheIter
*
pIter
,
size_t
*
dataLen
);
void
*
taosCacheIterGetKey
(
const
SCacheIter
*
pIter
,
size_t
*
keyLen
);
void
taosCacheDestroyIter
(
SCacheIter
*
pIter
);
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
);
bool
taosCacheIterNext
(
SCacheIter
*
pIter
);
void
*
taosCacheIterGetData
(
const
SCacheIter
*
pIter
,
size_t
*
dataLen
);
void
*
taosCacheIterGetKey
(
const
SCacheIter
*
pIter
,
size_t
*
keyLen
);
void
taosCacheDestroyIter
(
SCacheIter
*
pIter
);
#ifdef __cplusplus
}
...
...
source/client/src/tmq.c
浏览文件 @
ce63ad05
...
...
@@ -260,6 +260,16 @@ void tmq_list_destroy(tmq_list_t* list) {
taosArrayDestroy
(
container
);
}
int32_t
tmq_list_get_size
(
const
tmq_list_t
*
list
)
{
const
SArray
*
container
=
&
list
->
container
;
return
taosArrayGetSize
(
container
);
}
char
**
tmq_list_to_c_array
(
const
tmq_list_t
*
list
)
{
const
SArray
*
container
=
&
list
->
container
;
return
container
->
pData
;
}
static
int32_t
tmqMakeTopicVgKey
(
char
*
dst
,
const
char
*
topicName
,
int32_t
vg
)
{
return
sprintf
(
dst
,
"%s:%d"
,
topicName
,
vg
);
}
...
...
@@ -387,7 +397,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq
->
commit_cb
=
conf
->
commit_cb
;
pTmq
->
resetOffsetCfg
=
conf
->
resetOffset
;
pTmq
->
consumerId
=
generateRequestId
()
&
(((
uint64_t
)
-
1
)
>>
1
);
pTmq
->
consumerId
=
tGenIdPI64
(
);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
if
(
pTmq
->
clientTopics
==
NULL
)
{
taosMemoryFree
(
pTmq
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
ce63ad05
...
...
@@ -218,9 +218,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_GET_INDEX
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
// Requests handled by VNODE
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_REB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_CANCEL_CONN_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_STB_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
ce63ad05
...
...
@@ -276,10 +276,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/src/mndPerfSchema.c
浏览文件 @
ce63ad05
...
...
@@ -49,6 +49,7 @@ static const SPerfsTableSchema topicSchema[] = {
static
const
SPerfsTableSchema
consumerSchema
[]
=
{
{.
name
=
"client_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"app_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"pid"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"status"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
...
...
@@ -61,6 +62,7 @@ static const SPerfsTableSchema subscribeSchema[] = {
{.
name
=
"topic_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"offset"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"client_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
ce63ad05
...
...
@@ -58,7 +58,8 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
int32_t
mndInitProfile
(
SMnode
*
pMnode
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
int32_t
connCheckTime
=
tsShellActivityTimer
*
2
;
// in ms
int32_t
connCheckTime
=
tsShellActivityTimer
*
2
*
1000
;
pMgmt
->
cache
=
taosCacheInit
(
TSDB_DATA_TYPE_INT
,
connCheckTime
,
true
,
(
__cache_free_fn_t
)
mndFreeConn
,
"conn"
);
if
(
pMgmt
->
cache
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -174,10 +175,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
}
static
int32_t
mndProcessConnectReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SUserObj
*
pUser
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SConnObj
*
pConn
=
NULL
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SUserObj
*
pUser
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SConnObj
*
pConn
=
NULL
;
int32_t
code
=
-
1
;
SConnectReq
connReq
=
{
0
};
char
ip
[
30
]
=
{
0
};
...
...
@@ -464,7 +465,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
taosArrayDestroyEx
(
batchReq
.
reqs
,
tFreeClientHbReq
);
int32_t
tlen
=
tSerializeSClientHbBatchRsp
(
NULL
,
0
,
&
batchRsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
buf
=
rpcMallocCont
(
tlen
);
tSerializeSClientHbBatchRsp
(
buf
,
tlen
,
&
batchRsp
);
int32_t
rspNum
=
(
int32_t
)
taosArrayGetSize
(
batchRsp
.
rsps
);
...
...
@@ -486,7 +487,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
}
static
int32_t
mndProcessKillQueryReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
SUserObj
*
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
...
...
@@ -520,7 +521,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
}
static
int32_t
mndProcessKillConnReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
SUserObj
*
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
...
...
@@ -552,11 +553,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
}
static
int32_t
mndRetrieveConns
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
numOfRows
=
0
;
SConnObj
*
pConn
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
*
pWrite
;
char
ipStr
[
TSDB_IPv4ADDR_LEN
+
6
];
if
(
pShow
->
pIter
==
NULL
)
{
...
...
@@ -611,8 +612,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
}
static
int32_t
mndRetrieveQueries
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
numOfRows
=
0
;
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
numOfRows
=
0
;
#if 0
SConnObj *pConn = NULL;
int32_t cols = 0;
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
ce63ad05
...
...
@@ -28,7 +28,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
int32_t
mndInitShow
(
SMnode
*
pMnode
)
{
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
pMgmt
->
cache
=
taosCacheInit
(
TSDB_DATA_TYPE_INT
,
5
,
true
,
(
__cache_free_fn_t
)
mndFreeShowObj
,
"show"
);
pMgmt
->
cache
=
taosCacheInit
(
TSDB_DATA_TYPE_INT
,
5
000
,
true
,
(
__cache_free_fn_t
)
mndFreeShowObj
,
"show"
);
if
(
pMgmt
->
cache
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"failed to alloc show cache since %s"
,
terrstr
());
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
ce63ad05
...
...
@@ -534,8 +534,8 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTopic
->
createTime
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
char
*
sql
=
taosMemoryCalloc
(
1
,
strlen
(
pTopic
->
sql
)
+
1
+
VARSTR_HEADER_SIZE
)
;
strcpy
(
&
sql
[
VARSTR_HEADER_SIZE
],
pTopic
->
sql
);
char
sql
[
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
tstrncpy
(
&
sql
[
VARSTR_HEADER_SIZE
],
pTopic
->
sql
,
TSDB_SHOW_SQL_LEN
);
varDataSetLen
(
sql
,
strlen
(
&
sql
[
VARSTR_HEADER_SIZE
]));
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
sql
,
false
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
ce63ad05
...
...
@@ -18,8 +18,10 @@
#include "executor.h"
#include "os.h"
#include "tcache.h"
#include "thash.h"
#include "tmsg.h"
#include "trpc.h"
#include "ttimer.h"
#include "wal.h"
...
...
@@ -81,7 +83,6 @@ typedef struct STqOffsetStore STqOffsetStore;
struct
STqReadHandle
{
int64_t
ver
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
...
...
@@ -142,26 +143,37 @@ typedef struct {
}
STqMetaStore
;
typedef
struct
{
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int64_t
consumerId
;
int32_t
epoch
;
int8_t
subType
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
int64_t
consumerId
;
int32_t
epoch
;
int32_t
skipLogNum
;
int64_t
reqOffset
;
SRWLatch
lock
;
SRpcMsg
*
handle
;
}
STqPushHandle
;
typedef
struct
{
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int64_t
consumerId
;
int32_t
epoch
;
int8_t
subType
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
STqPushHandle
pushHandle
;
// SRWLatch lock;
SWalReadHandle
*
pWalReader
;
//
number should be identical to fetch thread num
STqReadHandle
*
p
StreamReader
[
4
];
qTaskInfo_t
task
[
4
];
//
task number should be the same with fetch thread
STqReadHandle
*
p
ExecReader
[
5
];
qTaskInfo_t
task
[
5
];
}
STqExec
;
struct
STQ
{
char
*
path
;
// STqMetaStore* tqMeta;
SHashObj
*
execs
;
// subKey -> tqExec
SHashObj
*
pushMgr
;
// consumerId -> STqExec*
SHashObj
*
execs
;
// subKey -> STqExec
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
...
...
@@ -205,20 +217,6 @@ typedef struct {
SArray
*
topics
;
// SArray<STqTopic>
}
STqConsumer
;
enum
{
TQ_PUSHER_TYPE__CLIENT
=
1
,
TQ_PUSHER_TYPE__STREAM
,
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
3
];
int32_t
ttl
;
int64_t
consumerId
;
SRpcMsg
*
pMsg
;
// SMqPollRsp* rsp;
}
STqClientPusher
;
typedef
struct
{
int8_t
type
;
int8_t
nodeType
;
...
...
@@ -228,10 +226,6 @@ typedef struct {
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
SHashObj
*
pHash
;
// <id, STqPush*>
}
STqPushMgr
;
typedef
struct
{
int8_t
inited
;
tmr_h
timer
;
...
...
@@ -247,14 +241,11 @@ void tqCleanUp();
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
sion
);
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessVgChangeReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
// int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
// int32_t tqProcessRebReq(STQ* pTq, char* msg);
// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
...
...
@@ -295,16 +286,6 @@ int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t
int32_t
tqOffsetPersist
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetPersistAll
(
STqOffsetStore
*
pStore
);
// tqPush
int32_t
tqPushMgrInit
();
void
tqPushMgrCleanUp
();
STqPushMgr
*
tqPushMgrOpen
();
void
tqPushMgrClose
(
STqPushMgr
*
pushMgr
);
STqClientPusher
*
tqAddClientPusher
(
STqPushMgr
*
pushMgr
,
SRpcMsg
*
pMsg
,
int64_t
consumerId
,
int64_t
ttl
);
STqStreamPusher
*
tqAddStreamPusher
(
STqPushMgr
*
pushMgr
,
int64_t
streamId
,
SEpSet
*
pEpSet
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
ce63ad05
...
...
@@ -15,9 +15,12 @@
#include "vnodeInt.h"
int32_t
tqInit
()
{
return
tqPushMgrInit
();
}
int32_t
tqInit
()
{
//
return
0
;
}
void
tqCleanUp
()
{
tqPushMgrCleanUp
();
}
void
tqCleanUp
()
{}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
...
...
@@ -41,6 +44,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTq
->
pushMgr
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
return
pTq
;
}
...
...
@@ -52,8 +57,139 @@ void tqClose(STQ* pTq) {
// TODO
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
version
)
{
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
STqExec
*
pExec
=
NULL
;
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
msg
;
int32_t
workerId
=
4
;
int64_t
fetchOffset
=
ver
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pushMgr
,
pIter
);
if
(
pIter
==
NULL
)
break
;
pExec
=
*
(
STqExec
**
)
pIter
;
taosWLockLatch
(
&
pExec
->
pushHandle
.
lock
);
SRpcMsg
*
pMsg
=
atomic_load_ptr
(
&
pExec
->
pushHandle
.
handle
);
ASSERT
(
pMsg
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pExec
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
qTaskInfo_t
task
=
pExec
->
task
[
workerId
];
ASSERT
(
task
);
qSetStreamInput
(
task
,
pReq
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
if
(
pDataBlock
==
NULL
)
break
;
ASSERT
(
pDataBlock
->
info
.
rows
!=
0
);
ASSERT
(
pDataBlock
->
info
.
numOfCols
!=
0
);
int32_t
dataStrLen
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pDataBlock
);
void
*
buf
=
taosMemoryCalloc
(
1
,
dataStrLen
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
buf
;
pRetrieve
->
useconds
=
ts
;
pRetrieve
->
precision
=
TSDB_DEFAULT_PRECISION
;
pRetrieve
->
compressed
=
0
;
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
pDataBlock
->
info
.
rows
);
// TODO enable compress
int32_t
actualLen
=
0
;
blockCompressEncode
(
pDataBlock
,
pRetrieve
->
data
,
&
actualLen
,
pDataBlock
->
info
.
numOfCols
,
false
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
rsp
.
blockDataLen
,
&
actualLen
);
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
rsp
.
blockNum
++
;
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
STqReadHandle
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqReadHandleSetMsg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlock
(
pReader
))
{
SSDataBlock
block
=
{
0
};
if
(
tqRetrieveDataBlock
(
&
block
.
pDataBlock
,
pReader
,
&
block
.
info
.
groupId
,
&
block
.
info
.
rows
,
&
block
.
info
.
numOfCols
)
<
0
)
{
ASSERT
(
0
);
}
int32_t
dataStrLen
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
&
block
);
void
*
buf
=
taosMemoryCalloc
(
1
,
dataStrLen
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
buf
;
/*pRetrieve->useconds = 0;*/
pRetrieve
->
precision
=
TSDB_DEFAULT_PRECISION
;
pRetrieve
->
compressed
=
0
;
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
block
.
info
.
rows
);
// TODO enable compress
int32_t
actualLen
=
0
;
blockCompressEncode
(
&
block
,
pRetrieve
->
data
,
&
actualLen
,
block
.
info
.
numOfCols
,
false
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
rsp
.
blockDataLen
,
&
actualLen
);
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
rsp
.
blockNum
++
;
}
}
else
{
ASSERT
(
0
);
}
if
(
rsp
.
blockNum
==
0
)
{
taosWUnLockLatch
(
&
pExec
->
pushHandle
.
lock
);
continue
;
}
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
rsp
.
rspOffset
=
fetchOffset
;
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqDataBlkRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
pMsg
->
code
=
-
1
;
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pExec
->
pushHandle
.
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
pExec
->
pushHandle
.
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
atomic_store_ptr
(
&
pExec
->
pushHandle
.
handle
,
NULL
);
taosWUnLockLatch
(
&
pExec
->
pushHandle
.
lock
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pExec
->
pushHandle
.
consumerId
,
pExec
->
pushHandle
.
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
taosArrayDestroy
(
rsp
.
blockDataLen
);
}
return
0
;
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
...
...
@@ -61,7 +197,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy
(
data
,
msg
,
msgLen
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
,
ver
sion
)
!=
0
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
,
ver
)
!=
0
)
{
return
-
1
;
}
}
...
...
@@ -71,6 +207,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
.
pCont
=
data
,
.
contLen
=
msgLen
,
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
req
);
#if 0
...
...
@@ -240,6 +377,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReqV2
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
waitTime
=
pReq
->
blockingTime
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
...
...
@@ -265,8 +403,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pExec
->
epoch
);
...
...
@@ -283,6 +421,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// response to user
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
);
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
pExec->pushHandle.consumerId = consumerId;
pExec->pushHandle.epoch = reqEpoch;
pExec->pushHandle.reqOffset = rsp.reqOffset;
pExec->pushHandle.skipLogNum = rsp.skipLogNum;
pExec->pushHandle.handle = pMsg;
taosWUnLockLatch(&pExec->pushHandle.lock);
// TODO add timer
// TODO: the pointer will always be valid?
taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
return 0;
#endif
break
;
}
...
...
@@ -325,7 +485,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp
.
blockNum
++
;
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
STqReadHandle
*
pReader
=
pExec
->
p
Stream
Reader
[
workerId
];
STqReadHandle
*
pReader
=
pExec
->
p
Exec
Reader
[
workerId
];
tqReadHandleSetMsg
(
pReader
,
pCont
,
0
);
while
(
tqNextDataBlock
(
pReader
))
{
SSDataBlock
block
=
{
0
};
...
...
@@ -635,10 +795,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req
.
qmsg
=
NULL
;
pExec
->
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
4
;
i
++
)
{
pExec
->
p
Stream
Reader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pExec
->
p
Exec
Reader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pExec
->
p
Stream
Reader
[
i
],
.
reader
=
pExec
->
p
Exec
Reader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
ce63ad05
...
...
@@ -12,73 +12,3 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int32_t
tqPushMgrInit
()
{
//
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqPushMgmt
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
tqPushMgmt
.
timer
=
taosTmrInit
(
0
,
0
,
0
,
"TQ"
);
return
0
;
}
void
tqPushMgrCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqPushMgmt
.
inited
,
1
,
0
);
if
(
old
==
0
)
return
;
taosTmrStop
(
tqPushMgmt
.
timer
);
taosTmrCleanUp
(
tqPushMgmt
.
timer
);
}
STqPushMgr
*
tqPushMgrOpen
()
{
STqPushMgr
*
mgr
=
taosMemoryMalloc
(
sizeof
(
STqPushMgr
));
if
(
mgr
==
NULL
)
{
return
NULL
;
}
mgr
->
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
return
mgr
;
}
void
tqPushMgrClose
(
STqPushMgr
*
pushMgr
)
{
taosHashCleanup
(
pushMgr
->
pHash
);
taosMemoryFree
(
pushMgr
);
}
STqClientPusher
*
tqAddClientPusher
(
STqPushMgr
*
pushMgr
,
SRpcMsg
*
pMsg
,
int64_t
consumerId
,
int64_t
ttl
)
{
STqClientPusher
*
clientPusher
=
taosMemoryMalloc
(
sizeof
(
STqClientPusher
));
if
(
clientPusher
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
clientPusher
->
type
=
TQ_PUSHER_TYPE__CLIENT
;
clientPusher
->
pMsg
=
pMsg
;
clientPusher
->
consumerId
=
consumerId
;
clientPusher
->
ttl
=
ttl
;
if
(
taosHashPut
(
pushMgr
->
pHash
,
&
consumerId
,
sizeof
(
int64_t
),
&
clientPusher
,
sizeof
(
void
*
))
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
clientPusher
);
// TODO send rsp back
return
NULL
;
}
return
clientPusher
;
}
STqStreamPusher
*
tqAddStreamPusher
(
STqPushMgr
*
pushMgr
,
int64_t
streamId
,
SEpSet
*
pEpSet
)
{
STqStreamPusher
*
streamPusher
=
taosMemoryMalloc
(
sizeof
(
STqStreamPusher
));
if
(
streamPusher
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
streamPusher
->
type
=
TQ_PUSHER_TYPE__STREAM
;
streamPusher
->
nodeType
=
0
;
streamPusher
->
streamId
=
streamId
;
/*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/
if
(
taosHashPut
(
pushMgr
->
pHash
,
&
streamId
,
sizeof
(
int64_t
),
&
streamPusher
,
sizeof
(
void
*
))
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
streamPusher
);
return
NULL
;
}
return
streamPusher
;
}
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
ce63ad05
...
...
@@ -90,21 +90,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
// TODO: handle error
}
break
;
#if 0
case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error
}
} break;
case TDMT_VND_MQ_REB: {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
}
} break;
case TDMT_VND_MQ_CANCEL_CONN: {
if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
}
} break;
#endif
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
...
...
source/os/src/osSemaphore.c
浏览文件 @
ce63ad05
...
...
@@ -85,11 +85,11 @@ int32_t tsem_wait(tsem_t* sem) {
#include <mach/semaphore.h>
#include <mach/task.h>
static
TdThread
sem_thread
;
static
TdThread
sem_thread
;
static
TdThreadOnce
sem_once
;
static
task_t
sem_port
;
static
volatile
int
sem_inited
=
0
;
static
semaphore_t
sem_exit
;
static
task_t
sem_port
;
static
volatile
int
sem_inited
=
0
;
static
semaphore_t
sem_exit
;
static
void
*
sem_thread_routine
(
void
*
arg
)
{
(
void
)
arg
;
...
...
@@ -122,12 +122,12 @@ static void once_init(void) {
struct
tsem_s
{
#ifdef SEM_USE_PTHREAD
TdThreadMutex
lock
;
TdThreadCond
cond
;
TdThreadMutex
lock
;
TdThreadCond
cond
;
volatile
int64_t
val
;
#elif defined(SEM_USE_POSIX)
size_t
id
;
sem_t
*
sem
;
sem_t
*
sem
;
#elif defined(SEM_USE_SEM)
semaphore_t
sem
;
#else // SEM_USE_PTHREAD
...
...
@@ -140,7 +140,8 @@ struct tsem_s {
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if
(
*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
(
struct
tsem_s
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
p
));
...
...
@@ -180,20 +181,22 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
int
e
=
errno
;
if
(
e
==
EEXIST
)
continue
;
if
(
e
==
EINTR
)
continue
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
while
(
p
->
sem
==
SEM_FAILED
);
#elif defined(SEM_USE_SEM)
taosThreadOnce
(
&
sem_once
,
once_init
);
if
(
sem_inited
!=
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
errno
=
ENOMEM
;
return
-
1
;
}
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
p
->
sem
,
SYNC_POLICY_FIFO
,
value
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort
();
}
...
...
@@ -224,18 +227,21 @@ int tsem_wait(tsem_t *sem) {
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
-=
1
;
if
(
p
->
val
<
0
)
{
if
(
taosThreadCondWait
(
&
p
->
cond
,
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
...
...
@@ -260,18 +266,21 @@ int tsem_post(tsem_t *sem) {
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
+=
1
;
if
(
p
->
val
<=
0
)
{
if
(
taosThreadCondSignal
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
...
...
@@ -293,26 +302,30 @@ int tsem_destroy(tsem_t *sem) {
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
// abort();
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
//
sem);
abort();
return
0
;
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
valid
=
0
;
if
(
taosThreadCondDestroy
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
taosThreadMutexDestroy
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
...
...
@@ -321,8 +334,8 @@ int tsem_destroy(tsem_t *sem) {
int
r
=
sem_unlink
(
name
);
if
(
r
)
{
int
e
=
errno
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
#elif defined(SEM_USE_SEM)
...
...
@@ -424,4 +437,17 @@ int32_t tsem_wait(tsem_t* sem) {
return
ret
;
}
int32_t
tsem_timewait
(
tsem_t
*
sem
,
int64_t
nanosecs
)
{
int
ret
=
0
;
struct
timespec
tv
=
{
.
tv_sec
=
0
,
.
tv_nsec
=
nanosecs
,
};
while
((
ret
=
sem_timedwait
(
sem
,
&
tv
))
==
-
1
&&
errno
==
EINTR
)
continue
;
return
ret
;
}
#endif
source/util/src/tcache.c
浏览文件 @
ce63ad05
...
...
@@ -20,16 +20,16 @@
#include "tlog.h"
#include "tutil.h"
#define CACHE_MAX_CAPACITY
1024*1024*
16
#define CACHE_DEFAULT_CAPACITY 1024
*
4
#define CACHE_MAX_CAPACITY
1024 * 1024 *
16
#define CACHE_DEFAULT_CAPACITY 1024
*
4
static
TdThread
cacheRefreshWorker
=
{
0
};
static
TdThread
cacheRefreshWorker
=
{
0
};
static
TdThreadOnce
cacheThreadInit
=
PTHREAD_ONCE_INIT
;
static
TdThreadMutex
guard
=
TD_PTHREAD_MUTEX_INITIALIZER
;
static
SArray
*
pCacheArrayList
=
NULL
;
static
bool
stopRefreshWorker
=
false
;
static
bool
refreshWorkerNormalStopped
=
false
;
static
bool
refreshWorkerUnexpectedStopped
=
false
;
static
SArray
*
pCacheArrayList
=
NULL
;
static
bool
stopRefreshWorker
=
false
;
static
bool
refreshWorkerNormalStopped
=
false
;
static
bool
refreshWorkerUnexpectedStopped
=
false
;
typedef
struct
SCacheNode
{
uint64_t
addedTime
;
// the added time when this element is added or updated into cache
...
...
@@ -37,7 +37,7 @@ typedef struct SCacheNode {
int64_t
expireTime
;
// expire time
uint64_t
signature
;
struct
STrashElem
*
pTNodeHeader
;
// point to trash node head
uint16_t
keyLen
:
15
;
// max key size: 32kb
uint16_t
keyLen
:
15
;
// max key size: 32kb
bool
inTrashcan
:
1
;
// denote if it is in trash or not
uint32_t
size
;
// allocated size for current SCacheNode
uint32_t
dataLen
;
...
...
@@ -48,8 +48,8 @@ typedef struct SCacheNode {
}
SCacheNode
;
typedef
struct
SCacheEntry
{
int32_t
num
;
// number of elements in current entry
SRWLatch
latch
;
// entry latch
int32_t
num
;
// number of elements in current entry
SRWLatch
latch
;
// entry latch
SCacheNode
*
next
;
}
SCacheEntry
;
...
...
@@ -76,24 +76,24 @@ typedef struct SCacheIter {
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
struct
SCacheObj
{
int64_t
sizeInBytes
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
char
*
name
;
SCacheStatis
statistics
;
SCacheEntry
*
pEntryList
;
size_t
capacity
;
// number of slots
size_t
numOfElems
;
// number of elements in cache
_hash_fn_t
hashFp
;
// hash function
__cache_free_fn_t
freeFp
;
uint32_t
numOfElemsInTrash
;
// number of element in trash
STrashElem
*
pTrash
;
uint8_t
deleting
;
// set the deleting flag to stop refreshing ASAP.
TdThread
refreshWorker
;
bool
extendLifespan
;
// auto extend life span when one item is accessed.
int64_t
checkTick
;
// tick used to record the check times of the refresh threads
int64_t
sizeInBytes
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
char
*
name
;
SCacheStatis
statistics
;
SCacheEntry
*
pEntryList
;
size_t
capacity
;
// number of slots
size_t
numOfElems
;
// number of elements in cache
_hash_fn_t
hashFp
;
// hash function
__cache_free_fn_t
freeFp
;
uint32_t
numOfElemsInTrash
;
// number of element in trash
STrashElem
*
pTrash
;
uint8_t
deleting
;
// set the deleting flag to stop refreshing ASAP.
TdThread
refreshWorker
;
bool
extendLifespan
;
// auto extend life span when one item is accessed.
int64_t
checkTick
;
// tick used to record the check times of the refresh threads
#if defined(LINUX)
TdThreadRwlock
lock
;
#else
...
...
@@ -183,7 +183,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
* @return SCacheNode
*/
static
SCacheNode
*
taosCreateCacheNode
(
const
char
*
key
,
size_t
keyLen
,
const
char
*
pData
,
size_t
size
,
uint64_t
duration
);
uint64_t
duration
);
/**
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
...
...
@@ -268,7 +268,7 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
pEntry
->
num
+=
1
;
}
static
void
removeNodeInEntryList
(
SCacheEntry
*
pe
,
SCacheNode
*
prev
,
SCacheNode
*
pNode
)
{
static
void
removeNodeInEntryList
(
SCacheEntry
*
pe
,
SCacheNode
*
prev
,
SCacheNode
*
pNode
)
{
if
(
prev
==
NULL
)
{
ASSERT
(
pe
->
next
==
pNode
);
pe
->
next
=
pNode
->
pNext
;
...
...
@@ -280,14 +280,14 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode*
pe
->
num
-=
1
;
}
static
FORCE_INLINE
SCacheEntry
*
doFindEntry
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
)
{
static
FORCE_INLINE
SCacheEntry
*
doFindEntry
(
SCacheObj
*
pCacheObj
,
const
void
*
key
,
size_t
keyLen
)
{
uint32_t
hashVal
=
(
*
pCacheObj
->
hashFp
)(
key
,
keyLen
);
int32_t
slot
=
hashVal
%
pCacheObj
->
capacity
;
return
&
pCacheObj
->
pEntryList
[
slot
];
}
static
FORCE_INLINE
SCacheNode
*
doSearchInEntryList
(
SCacheEntry
*
pe
,
const
void
*
key
,
size_t
keyLen
,
SCacheNode
**
prev
)
{
static
FORCE_INLINE
SCacheNode
*
doSearchInEntryList
(
SCacheEntry
*
pe
,
const
void
*
key
,
size_t
keyLen
,
SCacheNode
**
prev
)
{
SCacheNode
*
pNode
=
pe
->
next
;
while
(
pNode
)
{
if
((
pNode
->
keyLen
==
keyLen
)
&&
memcmp
(
pNode
->
key
,
key
,
keyLen
)
==
0
)
{
...
...
@@ -300,9 +300,9 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode*
return
pNode
;
}
static
bool
doRemoveExpiredFn
(
void
*
param
,
SCacheNode
*
pNode
)
{
static
bool
doRemoveExpiredFn
(
void
*
param
,
SCacheNode
*
pNode
)
{
SCacheObjTravSup
*
ps
=
(
SCacheObjTravSup
*
)
param
;
SCacheObj
*
pCacheObj
=
ps
->
pCacheObj
;
SCacheObj
*
pCacheObj
=
ps
->
pCacheObj
;
if
((
int64_t
)
pNode
->
expireTime
<
ps
->
time
&&
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
...
...
@@ -321,7 +321,7 @@ static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
static
bool
doRemoveNodeFn
(
void
*
param
,
SCacheNode
*
pNode
)
{
SCacheObjTravSup
*
ps
=
(
SCacheObjTravSup
*
)
param
;
SCacheObj
*
pCacheObj
=
ps
->
pCacheObj
;
SCacheObj
*
pCacheObj
=
ps
->
pCacheObj
;
if
(
T_REF_VAL_GET
(
pNode
)
==
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
...
...
@@ -348,14 +348,14 @@ static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
len
=
(
len
<<
1u
);
}
return
len
>
CACHE_MAX_CAPACITY
?
CACHE_MAX_CAPACITY
:
len
;
return
len
>
CACHE_MAX_CAPACITY
?
CACHE_MAX_CAPACITY
:
len
;
}
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeIn
Second
s
,
bool
extendLifespan
,
__cache_free_fn_t
fn
,
SCacheObj
*
taosCacheInit
(
int32_t
keyType
,
int64_t
refreshTimeIn
M
s
,
bool
extendLifespan
,
__cache_free_fn_t
fn
,
const
char
*
cacheName
)
{
const
int32_t
SLEEP_DURATION
=
500
;
// 500 ms
if
(
refreshTimeIn
Second
s
<=
0
)
{
if
(
refreshTimeIn
M
s
<=
0
)
{
return
NULL
;
}
...
...
@@ -375,10 +375,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
}
// set free cache node callback function
pCacheObj
->
hashFp
=
taosGetDefaultHashFunction
(
keyType
);
pCacheObj
->
freeFp
=
fn
;
pCacheObj
->
refreshTime
=
refreshTimeInSeconds
*
1000
;
pCacheObj
->
checkTick
=
pCacheObj
->
refreshTime
/
SLEEP_DURATION
;
pCacheObj
->
hashFp
=
taosGetDefaultHashFunction
(
keyType
);
pCacheObj
->
freeFp
=
fn
;
pCacheObj
->
refreshTime
=
refreshTimeInMs
;
pCacheObj
->
checkTick
=
pCacheObj
->
refreshTime
/
SLEEP_DURATION
;
pCacheObj
->
extendLifespan
=
extendLifespan
;
// the TTL after the last access
if
(
__trashcan_lock_init
(
pCacheObj
)
!=
0
)
{
...
...
@@ -412,8 +412,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
taosWLockLatch
(
&
pe
->
latch
);
SCacheNode
*
prev
=
NULL
;
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
SCacheNode
*
prev
=
NULL
;
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
if
(
pNode
==
NULL
)
{
pushfrontNodeInEntryList
(
pe
,
pNode1
);
...
...
@@ -461,12 +461,12 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return
NULL
;
}
SCacheNode
*
prev
=
NULL
;
SCacheNode
*
prev
=
NULL
;
SCacheEntry
*
pe
=
doFindEntry
(
pCacheObj
,
key
,
keyLen
);
taosRLockLatch
(
&
pe
->
latch
);
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
SCacheNode
*
pNode
=
doSearchInEntryList
(
pe
,
key
,
keyLen
,
&
prev
);
if
(
pNode
!=
NULL
)
{
int32_t
ref
=
T_REF_INC
(
pNode
);
ASSERT
(
ref
>
0
);
...
...
@@ -590,7 +590,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
else
{
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
// when reaches here.
SCacheNode
*
prev
=
NULL
;
SCacheNode
*
prev
=
NULL
;
SCacheEntry
*
pe
=
doFindEntry
(
pCacheObj
,
pNode
->
key
,
pNode
->
keyLen
);
taosWLockLatch
(
&
pe
->
latch
);
...
...
@@ -647,7 +647,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
}
void
doTraverseElems
(
SCacheObj
*
pCacheObj
,
bool
(
*
fp
)(
void
*
param
,
SCacheNode
*
pNode
),
SCacheObjTravSup
*
pSup
)
{
void
doTraverseElems
(
SCacheObj
*
pCacheObj
,
bool
(
*
fp
)(
void
*
param
,
SCacheNode
*
pNode
),
SCacheObjTravSup
*
pSup
)
{
int32_t
numOfEntries
=
(
int32_t
)
pCacheObj
->
capacity
;
for
(
int32_t
i
=
0
;
i
<
numOfEntries
;
++
i
)
{
SCacheEntry
*
pEntry
=
&
pCacheObj
->
pEntryList
[
i
];
...
...
@@ -676,7 +676,7 @@ void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* p
}
}
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
)
{
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
)
{
SCacheObjTravSup
sup
=
{.
pCacheObj
=
pCacheObj
,
.
fp
=
NULL
,
.
time
=
taosGetTimestampMs
()};
doTraverseElems
(
pCacheObj
,
doRemoveNodeFn
,
&
sup
);
taosTrashcanEmpty
(
pCacheObj
,
false
);
...
...
@@ -711,20 +711,20 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat
return
NULL
;
}
pNewNode
->
data
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheNode
);
pNewNode
->
data
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheNode
);
pNewNode
->
dataLen
=
size
;
memcpy
(
pNewNode
->
data
,
pData
,
size
);
pNewNode
->
key
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheNode
)
+
size
;
pNewNode
->
key
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheNode
)
+
size
;
pNewNode
->
keyLen
=
(
uint16_t
)
keyLen
;
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
lifespan
=
duration
;
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
lifespan
=
duration
;
pNewNode
->
expireTime
=
pNewNode
->
addedTime
+
pNewNode
->
lifespan
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
sizeInBytes
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
sizeInBytes
;
return
pNewNode
;
}
...
...
@@ -915,21 +915,19 @@ void taosStopCacheRefreshWorker(void) {
taosArrayDestroy
(
pCacheArrayList
);
}
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
)
{
return
pCacheObj
->
numOfElems
+
pCacheObj
->
numOfElemsInTrash
;
}
size_t
taosCacheGetNumOfObj
(
const
SCacheObj
*
pCacheObj
)
{
return
pCacheObj
->
numOfElems
+
pCacheObj
->
numOfElemsInTrash
;
}
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
)
{
SCacheIter
*
taosCacheCreateIter
(
const
SCacheObj
*
pCacheObj
)
{
ASSERT
(
pCacheObj
!=
NULL
);
SCacheIter
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheIter
));
pIter
->
pCacheObj
=
(
SCacheObj
*
)
pCacheObj
;
SCacheIter
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheIter
));
pIter
->
pCacheObj
=
(
SCacheObj
*
)
pCacheObj
;
pIter
->
entryIndex
=
-
1
;
pIter
->
index
=
-
1
;
pIter
->
index
=
-
1
;
return
pIter
;
}
bool
taosCacheIterNext
(
SCacheIter
*
pIter
)
{
SCacheObj
*
pCacheObj
=
pIter
->
pCacheObj
;
bool
taosCacheIterNext
(
SCacheIter
*
pIter
)
{
SCacheObj
*
pCacheObj
=
pIter
->
pCacheObj
;
if
(
pIter
->
index
+
1
>=
pIter
->
numOfObj
)
{
if
(
pIter
->
entryIndex
+
1
>=
pCacheObj
->
capacity
)
{
...
...
@@ -937,9 +935,9 @@ bool taosCacheIterNext(SCacheIter* pIter) {
}
// release the reference for all objects in the snapshot
for
(
int32_t
i
=
0
;
i
<
pIter
->
numOfObj
;
++
i
)
{
char
*
p
=
pIter
->
pCurrent
[
i
]
->
data
;
taosCacheRelease
(
pCacheObj
,
(
void
**
)
&
p
,
false
);
for
(
int32_t
i
=
0
;
i
<
pIter
->
numOfObj
;
++
i
)
{
char
*
p
=
pIter
->
pCurrent
[
i
]
->
data
;
taosCacheRelease
(
pCacheObj
,
(
void
**
)
&
p
,
false
);
pIter
->
pCurrent
[
i
]
=
NULL
;
}
...
...
@@ -968,7 +966,7 @@ bool taosCacheIterNext(SCacheIter* pIter) {
pIter
->
pCurrent
=
(
SCacheNode
**
)
tmp
;
}
SCacheNode
*
pNode
=
pEntry
->
next
;
SCacheNode
*
pNode
=
pEntry
->
next
;
for
(
int32_t
i
=
0
;
i
<
pEntry
->
num
;
++
i
)
{
ASSERT
(
pNode
!=
NULL
);
...
...
@@ -982,7 +980,7 @@ bool taosCacheIterNext(SCacheIter* pIter) {
pIter
->
numOfObj
=
pEntry
->
num
;
taosRUnLockLatch
(
&
pEntry
->
latch
);
pIter
->
index
=
-
1
;
pIter
->
index
=
-
1
;
break
;
}
}
...
...
@@ -991,19 +989,19 @@ bool taosCacheIterNext(SCacheIter* pIter) {
return
true
;
}
void
*
taosCacheIterGetData
(
const
SCacheIter
*
pIter
,
size_t
*
len
)
{
SCacheNode
*
pNode
=
pIter
->
pCurrent
[
pIter
->
index
];
void
*
taosCacheIterGetData
(
const
SCacheIter
*
pIter
,
size_t
*
len
)
{
SCacheNode
*
pNode
=
pIter
->
pCurrent
[
pIter
->
index
];
*
len
=
pNode
->
dataLen
;
return
pNode
->
data
;
}
void
*
taosCacheIterGetKey
(
const
SCacheIter
*
pIter
,
size_t
*
len
)
{
SCacheNode
*
pNode
=
pIter
->
pCurrent
[
pIter
->
index
];
void
*
taosCacheIterGetKey
(
const
SCacheIter
*
pIter
,
size_t
*
len
)
{
SCacheNode
*
pNode
=
pIter
->
pCurrent
[
pIter
->
index
];
*
len
=
pNode
->
keyLen
;
return
pNode
->
key
;
}
void
taosCacheDestroyIter
(
SCacheIter
*
pIter
)
{
void
taosCacheDestroyIter
(
SCacheIter
*
pIter
)
{
taosMemoryFreeClear
(
pIter
->
pCurrent
);
taosMemoryFreeClear
(
pIter
);
}
\ No newline at end of file
}
source/util/test/cacheTest.cpp
浏览文件 @
ce63ad05
#include <iostream>
#include <gtest/gtest.h>
#include <iostream>
#include "os.h"
#include "taos.h"
...
...
@@ -8,132 +8,134 @@
// test cache
TEST
(
cacheTest
,
client_cache_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
SCacheObj
*
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
,
0
,
NULL
,
"test"
);
SCacheObj
*
tscMetaCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
*
1000
,
0
,
NULL
,
"test"
);
const
char
*
key1
=
"test1"
;
char
data1
[]
=
"test11"
;
char
data1
[]
=
"test11"
;
char
*
cachedObj
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key1
,
strlen
(
key1
),
data1
,
strlen
(
data1
)
+
1
,
1
);
taosSsleep
(
REFRESH_TIME_IN_SEC
+
1
);
char
*
cachedObj
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key1
,
strlen
(
key1
),
data1
,
strlen
(
data1
)
+
1
,
1
);
taosSsleep
(
REFRESH_TIME_IN_SEC
+
1
);
printf
(
"obj is still valid: %s
\n
"
,
cachedObj
);
char
data2
[]
=
"test22"
;
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj
,
false
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj
,
false
);
/* the object is cleared by cache clean operation */
cachedObj
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key1
,
strlen
(
key1
),
data2
,
strlen
(
data2
)
+
1
,
20
);
cachedObj
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key1
,
strlen
(
key1
),
data2
,
strlen
(
data2
)
+
1
,
20
);
printf
(
"after updated: %s
\n
"
,
cachedObj
);
printf
(
"start to remove data from cache
\n
"
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj
,
false
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj
,
false
);
printf
(
"end of removing data from cache
\n
"
);
const
char
*
key3
=
"test2"
;
const
char
*
data3
=
"kkkkkkk"
;
char
*
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key3
,
strlen
(
key3
),
data3
,
strlen
(
data3
)
+
1
,
1
);
char
*
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key3
,
strlen
(
key3
),
data3
,
strlen
(
data3
)
+
1
,
1
);
printf
(
"%s
\n
"
,
cachedObj2
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
false
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
false
);
taosSsleep
(
3
);
char
*
d
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key3
,
strlen
(
key3
));
char
*
d
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key3
,
strlen
(
key3
));
assert
(
d
==
NULL
);
char
key5
[]
=
"test5"
;
char
data5
[]
=
"data5kkkkk"
;
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data5
,
strlen
(
data5
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data5
,
strlen
(
data5
)
+
1
,
20
);
const
char
*
data6
=
"new Data after updated"
;
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
false
);
const
char
*
data6
=
"new Data after updated"
;
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
false
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data6
,
strlen
(
data6
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data6
,
strlen
(
data6
)
+
1
,
20
);
printf
(
"%s
\n
"
,
cachedObj2
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
true
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
true
);
const
char
*
data7
=
"add call update procedure"
;
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data7
,
strlen
(
data7
)
+
1
,
20
);
cachedObj2
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key5
,
strlen
(
key5
),
data7
,
strlen
(
data7
)
+
1
,
20
);
printf
(
"%s
\n
=======================================
\n\n
"
,
cachedObj2
);
char
*
cc
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key5
,
strlen
(
key5
));
char
*
cc
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key5
,
strlen
(
key5
));
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
true
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cc
,
false
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cachedObj2
,
true
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
cc
,
false
);
const
char
*
data8
=
"ttft"
;
const
char
*
key6
=
"key6"
;
char
*
ft
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key6
,
strlen
(
key6
),
data8
,
strlen
(
data8
),
20
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
ft
,
false
);
char
*
ft
=
(
char
*
)
taosCachePut
(
tscMetaCache
,
key6
,
strlen
(
key6
),
data8
,
strlen
(
data8
),
20
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
ft
,
false
);
/**
* 140ns
*/
uint64_t
startTime
=
taosGetTimestampUs
();
printf
(
"Cache Performance Test
\n
start time:%"
PRIu64
"
\n
"
,
startTime
);
for
(
int32_t
i
=
0
;
i
<
1000
;
++
i
)
{
char
*
dd
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key6
,
strlen
(
key6
));
for
(
int32_t
i
=
0
;
i
<
1000
;
++
i
)
{
char
*
dd
=
(
char
*
)
taosCacheAcquireByKey
(
tscMetaCache
,
key6
,
strlen
(
key6
));
if
(
dd
!=
NULL
)
{
// printf("get the data\n");
// printf("get the data\n");
}
else
{
printf
(
"data has been released
\n
"
);
}
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
dd
,
false
);
taosCacheRelease
(
tscMetaCache
,
(
void
**
)
&
dd
,
false
);
}
uint64_t
endTime
=
taosGetTimestampUs
();
int64_t
el
=
endTime
-
startTime
;
int64_t
el
=
endTime
-
startTime
;
printf
(
"End of Test, %"
PRIu64
"
\n
Total Elapsed Time:%"
PRIu64
" us.avg:%f us
\n
"
,
endTime
,
el
,
el
/
1000.0
);
printf
(
"End of Test, %"
PRIu64
"
\n
Total Elapsed Time:%"
PRIu64
" us.avg:%f us
\n
"
,
endTime
,
el
,
el
/
1000.0
);
taosCacheCleanup
(
tscMetaCache
);
}
TEST
(
cacheTest
,
cache_iter_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
auto
*
pCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
,
false
,
NULL
,
"test"
);
auto
*
pCache
=
taosCacheInit
(
TSDB_DATA_TYPE_BINARY
,
REFRESH_TIME_IN_SEC
*
1000
,
false
,
NULL
,
"test"
);
char
key
[
256
]
=
{
0
};
char
data
[
1024
]
=
"abcdefghijk"
;
// int32_t len = strlen(data);
// int32_t len = strlen(data);
uint64_t
startTime
=
taosGetTimestampUs
();
int32_t
num
=
10000
;
int32_t
num
=
10000
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
len
=
sprintf
(
key
,
"abc_%7d"
,
i
);
taosCachePut
(
pCache
,
key
,
strlen
(
key
),
data
,
len
,
3600
);
}
uint64_t
endTime
=
taosGetTimestampUs
();
printf
(
"add %d object cost:%"
PRIu64
" us, avg:%f us
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
printf
(
"add %d object cost:%"
PRIu64
" us, avg:%f us
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
startTime
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
len
=
sprintf
(
key
,
"abc_%7d"
,
i
);
void
*
k
=
taosCacheAcquireByKey
(
pCache
,
key
,
len
);
void
*
k
=
taosCacheAcquireByKey
(
pCache
,
key
,
len
);
assert
(
k
!=
0
);
}
endTime
=
taosGetTimestampUs
();
printf
(
"retrieve %d object cost:%"
PRIu64
" us,avg:%f
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
printf
(
"retrieve %d object cost:%"
PRIu64
" us,avg:%f
\n
"
,
num
,
endTime
-
startTime
,
(
endTime
-
startTime
)
/
(
double
)
num
);
int32_t
count
=
0
;
int32_t
count
=
0
;
SCacheIter
*
pIter
=
taosCacheCreateIter
(
pCache
);
while
(
taosCacheIterNext
(
pIter
))
{
while
(
taosCacheIterNext
(
pIter
))
{
size_t
keyLen
=
0
;
size_t
dataLen
=
0
;
char
*
key1
=
static_cast
<
char
*>
(
taosCacheIterGetKey
(
pIter
,
&
keyLen
));
char
*
data1
=
static_cast
<
char
*>
(
taosCacheIterGetData
(
pIter
,
&
dataLen
));
// char d[256] = {0};
// memcpy(d, data1, dataLen);
// char k[256] = {0};
// memcpy(k, key1, keyLen);
// char d[256] = {0};
// memcpy(d, data1, dataLen);
// char k[256] = {0};
// memcpy(k, key1, keyLen);
}
ASSERT_EQ
(
count
,
num
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
ce63ad05
...
...
@@ -76,7 +76,7 @@
./test.sh -f tsim/insert/backquote.sim -m
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m
#./test.sh -f tsim/tmq/basic
.sim -m
./test.sh -f tsim/tmq/basic1
.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录