Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3629958b
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3629958b
编写于
5月 23, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(tmq): serializer and deserializer for tq exec
上级
930b8500
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
65 addition
and
17 deletion
+65
-17
include/util/tencode.h
include/util/tencode.h
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+5
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+51
-6
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+8
-8
未找到文件。
include/util/tencode.h
浏览文件 @
3629958b
...
...
@@ -82,7 +82,7 @@ typedef struct {
do { \
SEncoder coder = {0}; \
tEncoderInit(&coder, NULL, 0); \
if ((E)(&coder, S)
=
= 0) { \
if ((E)(&coder, S)
>
= 0) { \
SIZE = coder.pos; \
RET = 0; \
} else { \
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
3629958b
...
...
@@ -20,9 +20,9 @@
#include "executor.h"
#include "os.h"
#include "tcache.h"
#include "thash.h"
#include "tmsg.h"
#include "tqueue.h"
#include "trpc.h"
#include "ttimer.h"
#include "wal.h"
...
...
@@ -86,6 +86,9 @@ typedef struct {
qTaskInfo_t
task
[
5
];
}
STqExec
;
int32_t
tEncodeSTqExec
(
SEncoder
*
pEncoder
,
const
STqExec
*
pExec
);
int32_t
tDecodeSTqExec
(
SDecoder
*
pDecoder
,
STqExec
*
pExec
);
struct
STQ
{
char
*
path
;
SHashObj
*
pushMgr
;
// consumerId -> STqExec*
...
...
@@ -93,7 +96,7 @@ struct STQ {
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
//
TDB* pTdb;
TDB
*
pTdb
;
};
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
3629958b
...
...
@@ -14,14 +14,25 @@
*/
#include "tq.h"
#include "tqueue.h"
int32_t
tqInit
()
{
//
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqMgmt
.
inited
,
0
,
1
);
if
(
old
==
0
)
{
tqMgmt
.
timer
=
taosTmrInit
(
10000
,
100
,
10000
,
"TQ"
);
if
(
tqMgmt
.
timer
==
NULL
)
{
atomic_store_8
(
&
tqMgmt
.
inited
,
0
);
return
-
1
;
}
}
return
0
;
}
void
tqCleanUp
()
{}
void
tqCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqMgmt
.
inited
,
1
,
2
);
if
(
old
!=
1
)
return
;
taosTmrCleanUp
(
tqMgmt
.
timer
);
atomic_store_8
(
&
tqMgmt
.
inited
,
0
);
}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
...
...
@@ -32,9 +43,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
path
=
strdup
(
path
);
pTq
->
pVnode
=
pVnode
;
pTq
->
pWal
=
pWal
;
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
/*ASSERT(0);*/
/*}*/
if
(
tdbOpen
(
path
,
4096
,
1
,
&
pTq
->
pTdb
)
<
0
)
{
ASSERT
(
0
);
}
pTq
->
execs
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
...
...
@@ -51,11 +62,45 @@ void tqClose(STQ* pTq) {
taosHashCleanup
(
pTq
->
execs
);
taosHashCleanup
(
pTq
->
pStreamTasks
);
taosHashCleanup
(
pTq
->
pushMgr
);
tdbClose
(
pTq
->
pTdb
);
taosMemoryFree
(
pTq
);
}
// TODO
}
int32_t
tEncodeSTqExec
(
SEncoder
*
pEncoder
,
const
STqExec
*
pExec
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pExec
->
subKey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pExec
->
consumerId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pExec
->
epoch
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pExec
->
subType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pExec
->
withTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pExec
->
withSchema
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pExec
->
withTag
)
<
0
)
return
-
1
;
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
tEncodeCStr
(
pEncoder
,
pExec
->
qmsg
)
<
0
)
return
-
1
;
// TODO encode modified exec
}
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeSTqExec
(
SDecoder
*
pDecoder
,
STqExec
*
pExec
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pExec
->
subKey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pExec
->
consumerId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pExec
->
epoch
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pExec
->
subType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pExec
->
withTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pExec
->
withSchema
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pExec
->
withTag
)
<
0
)
return
-
1
;
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pExec
->
qmsg
)
<
0
)
return
-
1
;
// TODO decode modified exec
}
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
3629958b
...
...
@@ -14,17 +14,17 @@
*/
#define _DEFAULT_SOURCE
#include "tcompare.h"
#include "os.h"
#include "taoserror.h"
#include "tcompare.h"
#include "tref.h"
#include "walInt.h"
typedef
struct
{
int8_t
stop
;
int8_t
inited
;
uint32_t
seq
;
int32_t
refSetId
;
int8_t
stop
;
int8_t
inited
;
uint32_t
seq
;
int32_t
refSetId
;
TdThread
thread
;
}
SWalMgmt
;
...
...
@@ -53,13 +53,14 @@ int32_t walInit() {
}
void
walCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
1
,
0
);
if
(
old
==
0
)
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
1
,
2
);
if
(
old
!=
1
)
{
return
;
}
walStopThread
();
taosCloseRef
(
tsWal
.
refSetId
);
wInfo
(
"wal module is cleaned up"
);
atomic_store_8
(
&
tsWal
.
inited
,
0
);
}
SWal
*
walOpen
(
const
char
*
path
,
SWalCfg
*
pCfg
)
{
...
...
@@ -126,7 +127,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
if
(
walCheckAndRepairIdx
(
pWal
)
<
0
)
{
}
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
cfg
.
vgId
,
pWal
,
pWal
->
cfg
.
level
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录