Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6beee204
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6beee204
编写于
12月 21, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tq query
上级
4a24d4bf
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
197 addition
and
41 deletion
+197
-41
include/dnode/vnode/tq/tq.h
include/dnode/vnode/tq/tq.h
+21
-8
include/dnode/vnode/vnode.h
include/dnode/vnode/vnode.h
+10
-0
include/libs/wal/wal.h
include/libs/wal/wal.h
+4
-1
include/util/ttimer.h
include/util/ttimer.h
+2
-0
source/dnode/vnode/tq/CMakeLists.txt
source/dnode/vnode/tq/CMakeLists.txt
+1
-0
source/dnode/vnode/tq/inc/tqInt.h
source/dnode/vnode/tq/inc/tqInt.h
+1
-0
source/dnode/vnode/tq/src/tq.c
source/dnode/vnode/tq/src/tq.c
+153
-29
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+5
-3
未找到文件。
include/dnode/vnode/tq/tq.h
浏览文件 @
6beee204
...
...
@@ -22,6 +22,7 @@
#include "taoserror.h"
#include "taosmsg.h"
#include "tlist.h"
#include "trpc.h"
#include "tutil.h"
#ifdef __cplusplus
...
...
@@ -54,6 +55,7 @@ typedef struct STqSetCurReq {
typedef
struct
STqConsumeReq
{
STqMsgHead
head
;
int64_t
blockingTime
;
// milisec
STqAcks
acks
;
}
STqConsumeReq
;
...
...
@@ -107,6 +109,17 @@ typedef struct STqExec {
struct
STqExec
*
(
*
deserialize
)(
char
*
);
}
STqExec
;
typedef
struct
STqRspHandle
{
void
*
handle
;
void
*
ahandle
;
}
STqRspHandle
;
typedef
enum
{
TQ_ITEM_READY
,
TQ_ITEM_PROCESS
,
TQ_ITEM_EMPTY
}
STqItemStatus
;
typedef
struct
STqBufferItem
{
int64_t
offset
;
// executors are identical but not concurrent
...
...
@@ -135,13 +148,13 @@ typedef struct STqListHandle {
}
STqList
;
typedef
struct
STqGroup
{
int64_t
clientId
;
int64_t
cgId
;
void
*
ahandle
;
int32_t
topicNum
;
STqList
*
head
;
SList
*
topicList
;
// SList<STqTopic>
void
*
returnMsg
;
// SVReadMsg
int64_t
clientId
;
int64_t
cgId
;
void
*
ahandle
;
int32_t
topicNum
;
//STqList*
head;
SList
*
topicList
;
// SList<STqTopic>
STqRspHandle
rspHandle
;
}
STqGroup
;
typedef
struct
STqQueryMsg
{
...
...
@@ -264,7 +277,7 @@ void tqClose(STQ*);
// void* will be replace by a msg type
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int
tqConsume
(
STQ
*
,
S
TqConsumeReq
*
);
int
tqConsume
(
STQ
*
,
S
RpcMsg
*
pReq
,
SRpcMsg
**
pRsp
);
int
tqSetCursor
(
STQ
*
,
STqSetCurReq
*
pMsg
);
int
tqBufferSetOffset
(
STqTopic
*
,
int64_t
offset
);
...
...
include/dnode/vnode/vnode.h
浏览文件 @
6beee204
...
...
@@ -122,6 +122,16 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
*/
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
/**
* @brief Process the sync request
*
...
...
include/libs/wal/wal.h
浏览文件 @
6beee204
...
...
@@ -174,8 +174,11 @@ SWalReadHandle *walOpenReadHandle(SWal *);
void
walCloseReadHandle
(
SWalReadHandle
*
);
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
);
// deprecated
#if 0
int32_t walRead(SWal *, SWalHead **, int64_t ver);
// int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
#endif
// lifecycle check
int64_t
walGetFirstVer
(
SWal
*
);
...
...
include/util/ttimer.h
浏览文件 @
6beee204
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_TIMER_H
#define _TD_UTIL_TIMER_H
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/dnode/vnode/tq/CMakeLists.txt
浏览文件 @
6beee204
...
...
@@ -12,6 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
)
if
(
${
BUILD_TEST
}
)
...
...
source/dnode/vnode/tq/inc/tqInt.h
浏览文件 @
6beee204
...
...
@@ -18,6 +18,7 @@
#include "tq.h"
#include "tlog.h"
#include "trpc.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/dnode/vnode/tq/src/tq.c
浏览文件 @
6beee204
...
...
@@ -14,7 +14,9 @@
*/
#include "tqInt.h"
#include "osSocket.h"
#include "tqMetaStore.h"
#include "osAtomic.h"
// static
// read next version data
...
...
@@ -51,16 +53,22 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
}
pTq
->
tqMeta
=
tqStoreOpen
(
path
,
(
FTqSerialize
)
tqSerializeGroup
,
(
FTqDeserialize
)
tqDeserializeGroup
,
free
,
0
);
if
(
pTq
->
tqMeta
==
NULL
)
{
// TODO: free STQ
free
(
pTq
);
allocFac
->
destroy
(
allocFac
,
pTq
->
tqMemRef
.
pAllocator
);
return
NULL
;
}
return
pTq
;
}
void
tqClose
(
STQ
*
pTq
)
{
// TODO
}
static
int
tqProtoCheck
(
STqMsgHead
*
pMsg
)
{
return
pMsg
->
protoVer
==
0
;
}
static
int
tqProtoCheck
(
STqMsgHead
*
pMsg
)
{
// TODO
return
pMsg
->
protoVer
==
0
;
}
static
int
tqAckOneTopic
(
STqTopic
*
pTopic
,
STqOneAck
*
pAck
,
STqQueryMsg
**
ppQuery
)
{
// clean old item and move forward
...
...
@@ -121,9 +129,15 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
// TODO
return
-
1
;
}
*
ppGroup
=
pGroup
;
memset
(
pGroup
,
0
,
sizeof
(
STqGroup
));
pGroup
->
topicList
=
tdListNew
(
sizeof
(
STqTopic
));
if
(
pGroup
->
topicList
==
NULL
)
{
free
(
pGroup
);
return
-
1
;
}
*
ppGroup
=
pGroup
;
return
0
;
}
...
...
@@ -152,46 +166,55 @@ int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return
0
;
}
static
int
tqFetch
(
STqGroup
*
pGroup
,
void
**
msg
)
{
STqList
*
h
ead
=
pGroup
->
head
;
STqList
*
node
=
h
ead
;
static
int
tqFetch
(
STqGroup
*
pGroup
,
STqConsumeRsp
**
pRsp
)
{
STqList
*
pH
ead
=
pGroup
->
head
;
STqList
*
pNode
=
pH
ead
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
// TODO: make it a macro
int
sizeLimit
=
4
*
1024
;
STqMsgContent
*
buffer
=
malloc
(
sizeLimit
);
if
(
buffer
==
NULL
)
{
// TODO:memory insufficient
int
sizeLimit
=
4
*
1024
;
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
*
pRsp
=
ptr
;
STqMsgContent
*
buffer
=
(
*
pRsp
)
->
msgs
;
// iterate the list to get msgs of all topics
// until all topic iterated or msgs over sizeLimit
while
(
n
ode
->
next
)
{
node
=
n
ode
->
next
;
STqTopic
*
topicHandle
=
&
n
ode
->
topic
;
int
idx
=
topicHandle
->
nextConsumeOffset
%
TQ_BUFFER_SIZE
;
if
(
topicHandle
->
buffer
[
idx
].
content
!=
NULL
&&
topicHandle
->
buffer
[
idx
].
offset
==
topicHandle
->
nextConsumeOffset
)
{
totSize
+=
topicHandle
->
buffer
[
idx
].
size
;
while
(
pN
ode
->
next
)
{
pNode
=
pN
ode
->
next
;
STqTopic
*
pTopic
=
&
pN
ode
->
topic
;
int
idx
=
pTopic
->
nextConsumeOffset
%
TQ_BUFFER_SIZE
;
if
(
pTopic
->
buffer
[
idx
].
content
!=
NULL
&&
pTopic
->
buffer
[
idx
].
offset
==
pTopic
->
nextConsumeOffset
)
{
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
(
buffer
,
totSize
);
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
topicHandle
->
buffer
[
idx
].
size
;
// TODO:memory insufficient
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
*
pRsp
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
topicHandle
->
topicId
;
*
((
int64_t
*
)
buffer
)
=
pTopic
->
topicId
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
topicHandle
->
buffer
[
idx
].
size
;
*
((
int64_t
*
)
buffer
)
=
pTopic
->
buffer
[
idx
].
size
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
topicHandle
->
buffer
[
idx
].
content
,
topicHandle
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
topicHandle
->
buffer
[
idx
].
size
);
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
}
return
totSize
;
(
*
pRsp
)
->
bodySize
=
totSize
;
return
numOfMsgs
;
}
STqGroup
*
tqGetGroup
(
STQ
*
pTq
,
int64_t
clientId
)
{
return
tqHandleGet
(
pTq
->
tqMeta
,
clientId
);
}
...
...
@@ -273,25 +296,126 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
return
0
;
}
int
tqConsume
(
STQ
*
pTq
,
STqConsumeReq
*
pMsg
)
{
// temporary
int
tqProcessCMsg
(
STQ
*
pTq
,
STqConsumeReq
*
pMsg
,
STqRspHandle
*
pRsp
)
{
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
pGroup
->
rspHandle
.
handle
=
pRsp
->
handle
;
pGroup
->
rspHandle
.
ahandle
=
pRsp
->
ahandle
;
return
0
;
}
int
tqConsume
(
STQ
*
pTq
,
SRpcMsg
*
pReq
,
SRpcMsg
**
pRsp
)
{
STqConsumeReq
*
pMsg
=
pReq
->
pCont
;
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
SList
*
topicList
=
pGroup
->
topicList
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
int
sizeLimit
=
4096
;
STqConsumeRsp
*
pCsmRsp
=
(
*
pRsp
)
->
pCont
;
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
SListIter
iter
;
tdListInitIter
(
topicList
,
&
iter
,
TD_LIST_FORWARD
);
STqMsgContent
*
buffer
=
NULL
;
SArray
*
pArray
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SListNode
*
pn
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
STqTopic
*
pTopic
=
*
(
STqTopic
**
)
pn
->
data
;
int
idx
=
pTopic
->
floatingCursor
%
TQ_BUFFER_SIZE
;
STqMsgItem
*
pItem
=
&
pTopic
->
buffer
[
idx
];
if
(
pItem
->
content
!=
NULL
&&
pItem
->
offset
==
pTopic
->
floatingCursor
)
{
if
(
pItem
->
status
==
TQ_ITEM_READY
)
{
//if has data
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
htonll
(
pTopic
->
topicId
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
htonll
(
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
else
if
(
pItem
->
status
==
TQ_ITEM_PROCESS
)
{
//if not have data but in process
}
else
if
(
pItem
->
status
==
TQ_ITEM_EMPTY
){
//if not have data and not in process
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_EMPTY
,
TQ_ITEM_PROCESS
);
if
(
old
!=
TQ_ITEM_EMPTY
)
{
continue
;
}
pItem
->
offset
=
pTopic
->
floatingCursor
;
taosArrayPush
(
pArray
,
&
pItem
);
}
else
{
ASSERT
(
0
);
}
}
}
for
(
int
i
=
0
;
i
<
pArray
->
size
;
i
++
)
{
STqMsgItem
*
pItem
=
taosArrayGet
(
pArray
,
i
);
void
*
raw
;
//read from wal
//get msgType
//if submitblk
pItem
->
executor
->
assign
(
pItem
->
executor
->
runtimeEnv
,
raw
);
SSDataBlock
*
content
=
pItem
->
executor
->
exec
(
pItem
->
executor
->
runtimeEnv
);
pItem
->
content
=
content
;
//if other type, send just put into buffer
pItem
->
content
=
raw
;
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_PROCESS
,
TQ_ITEM_READY
);
ASSERT
(
old
==
TQ_ITEM_PROCESS
);
}
STqConsumeRsp
*
pRsp
=
(
STqConsumeRsp
*
)
pMsg
;
int
numOfMsgs
=
tqFetch
(
pGroup
,
(
void
**
)
&
pRsp
->
msgs
);
if
(
numOfMsgs
<
0
)
{
return
-
1
;
}
if
(
numOfMsgs
==
0
)
{
// most recent data has been fetched
// enable timer for blocking wait
// once new data written
during wait time
// launch query and response
// once new data written
when waiting, launch query and rsp
return
-
1
;
}
// fetched a num of msgs, rpc response
...
...
source/libs/wal/src/walRead.c
浏览文件 @
6beee204
...
...
@@ -170,6 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return
0
;
}
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
...
...
@@ -207,6 +208,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return 0;
}
/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
/*return 0;*/
/*}*/
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录