Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
75297e3a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
75297e3a
编写于
8月 08, 2022
作者:
C
cpwu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into cpwu/3.0
上级
6891c40c
ad551558
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
246 addition
and
144 deletion
+246
-144
docs/zh/05-get-started/03-package.md
docs/zh/05-get-started/03-package.md
+2
-2
docs/zh/07-develop/07-tmq.md
docs/zh/07-develop/07-tmq.md
+3
-3
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+1
-1
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+21
-8
source/client/src/tmq.c
source/client/src/tmq.c
+2
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+2
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-2
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+2
-2
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+7
-14
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+6
-2
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+9
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+2
-0
source/libs/stream/CMakeLists.txt
source/libs/stream/CMakeLists.txt
+2
-1
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+1
-15
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+69
-42
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-2
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+2
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+66
-42
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+6
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+19
-1
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+16
-2
source/libs/transport/src/.transSvr.c.swo
source/libs/transport/src/.transSvr.c.swo
+0
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+4
-2
未找到文件。
docs/zh/05-get-started/03-package.md
浏览文件 @
75297e3a
...
...
@@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem";
:::
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。
也支持通过
`apt-get`
工具从线上进行安装。
## 安装
...
...
@@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group
```
sql
taos
>
select
avg
(
current
),
max
(
voltage
),
min
(
phase
)
from
test
.
d10
interval
(
10
s
);
```
\ No newline at end of file
```
docs/zh/07-develop/07-tmq.md
浏览文件 @
75297e3a
...
...
@@ -6,11 +6,11 @@ title: 数据订阅
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic
可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量
。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic
是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度
。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程
分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once
消费。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程
、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once
消费。
为了实现上述功能,TDengine
采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过
过滤、变换等操作,将数据推送给消费者。
为了实现上述功能,TDengine
会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现
过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
...
...
include/common/tcommon.h
浏览文件 @
75297e3a
...
...
@@ -104,6 +104,7 @@ typedef struct SDataBlockInfo {
uint32_t
capacity
;
// TODO: optimize and remove following
int64_t
version
;
// used for stream, and need serialization
int64_t
ts
;
// used for stream, and need serialization
int32_t
childId
;
// used for stream, do not serialize
EStreamType
type
;
// used for stream, do not serialize
STimeWindow
calWin
;
// used for stream, do not serialize
...
...
include/common/tmsg.h
浏览文件 @
75297e3a
...
...
@@ -3103,7 +3103,7 @@ typedef struct {
void
*
msg
;
}
SBatchRsp
;
static
FORCE_INLINE
void
tFreeSBatchRsp
(
void
*
p
)
{
static
FORCE_INLINE
void
tFreeSBatchRsp
(
void
*
p
)
{
if
(
NULL
==
p
)
{
return
;
}
...
...
include/libs/stream/tstream.h
浏览文件 @
75297e3a
...
...
@@ -17,6 +17,7 @@
#include "os.h"
#include "query.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
...
...
@@ -85,6 +86,12 @@ enum {
TASK_OUTPUT__FETCH
,
};
enum
{
STREAM_QUEUE__SUCESS
=
1
,
STREAM_QUEUE__FAILED
,
STREAM_QUEUE__PROCESSING
,
};
typedef
struct
{
int8_t
type
;
}
SStreamQueueItem
;
...
...
@@ -123,12 +130,6 @@ typedef struct {
SSDataBlock
*
pBlock
;
}
SStreamTrigger
;
enum
{
STREAM_QUEUE__SUCESS
=
1
,
STREAM_QUEUE__FAILED
,
STREAM_QUEUE__PROCESSING
,
};
typedef
struct
{
STaosQueue
*
queue
;
STaosQall
*
qall
;
...
...
@@ -233,6 +234,7 @@ typedef struct {
typedef
struct
SStreamTask
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
totalLevel
;
int8_t
taskLevel
;
int8_t
outputType
;
int16_t
dispatchMsgType
;
...
...
@@ -458,9 +460,20 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t
streamTryExec
(
SStreamTask
*
pTask
);
int32_t
streamSchedExec
(
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
SStreamMeta
;
typedef
int32_t
FTaskExpand
(
void
*
ahandle
,
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
{
char
*
path
;
TDB
*
db
;
TTB
*
pTaskDb
;
TTB
*
pStateDb
;
SHashObj
*
pTasks
;
void
*
ahandle
;
TXN
txn
;
FTaskExpand
*
expandFunc
;
}
SStreamMeta
;
SStreamMeta
*
streamMetaOpen
();
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
);
void
streamMetaClose
(
SStreamMeta
*
streamMeta
);
int32_t
streamMetaAddTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
...
...
source/client/src/tmq.c
浏览文件 @
75297e3a
...
...
@@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_destroy
(
lst
);
return
rsp
;
/*return rsp;*/
return
0
;
}
// TODO: free resources
return
0
;
...
...
source/common/src/tglobal.c
浏览文件 @
75297e3a
...
...
@@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeWriteThreads
=
TMAX
(
tsNumOfVnodeWriteThreads
,
1
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeWriteThreads"
,
tsNumOfVnodeWriteThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeSyncThreads
=
tsNumOfCores
;
// tsNumOfVnodeSyncThreads = tsNumOfCores;
tsNumOfVnodeSyncThreads
=
32
;
tsNumOfVnodeSyncThreads
=
TMAX
(
tsNumOfVnodeSyncThreads
,
1
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeSyncThreads"
,
tsNumOfVnodeSyncThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
75297e3a
...
...
@@ -604,11 +604,11 @@ typedef struct {
int64_t
createTime
;
int64_t
updateTime
;
int32_t
version
;
int32_t
totalLevel
;
int64_t
smaId
;
// 0 for unused
// info
int64_t
uid
;
int8_t
status
;
int8_t
isDistributed
;
// config
int8_t
igExpired
;
int8_t
trigger
;
...
...
@@ -647,7 +647,6 @@ typedef struct {
typedef struct {
int64_t uid;
int64_t streamId;
int8_t isDistributed;
int8_t status;
int8_t stage;
} SStreamRecoverObj;
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
75297e3a
...
...
@@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI64
(
pEncoder
,
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
totalLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
isDistributed
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
...
...
@@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
totalLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
isDistributed
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
igExpired
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
75297e3a
...
...
@@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
int32_t
totLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
ASSERT
(
totLevel
<=
2
);
pStream
->
tasks
=
taosArrayInit
(
totLevel
,
sizeof
(
void
*
));
pStream
->
isDistributed
=
totLevel
==
2
;
int32_t
planTotLevel
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
ASSERT
(
planTotLevel
<=
2
);
pStream
->
tasks
=
taosArrayInit
(
planTotLevel
,
sizeof
(
void
*
));
bool
hasExtraSink
=
false
;
bool
externalTargetDB
=
strcmp
(
pStream
->
sourceDb
,
pStream
->
targetDb
)
!=
0
;
...
...
@@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool
multiTarget
=
pDbObj
->
cfg
.
numOfVgroups
>
1
;
if
(
t
otLevel
==
2
||
externalTargetDB
||
multiTarget
)
{
if
(
planT
otLevel
==
2
||
externalTargetDB
||
multiTarget
)
{
/*if (true) {*/
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosArrayPush
(
pStream
->
tasks
,
&
taskOneLevel
);
...
...
@@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
}
}
pStream
->
totalLevel
=
planTotLevel
+
hasExtraSink
;
if
(
t
otLevel
>
1
)
{
if
(
planT
otLevel
>
1
)
{
SStreamTask
*
pInnerTask
;
// inner level
{
...
...
@@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return
-
1
;
}
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
#endif
if
(
tsSchedStreamToSnode
)
{
SSnodeObj
*
pSnode
=
mndSchedFetchOneSnode
(
pMnode
);
if
(
pSnode
==
NULL
)
{
...
...
@@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
}
if
(
t
otLevel
==
1
)
{
if
(
planT
otLevel
==
1
)
{
SArray
*
taskOneLevel
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosArrayPush
(
pStream
->
tasks
,
&
taskOneLevel
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
75297e3a
...
...
@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static
int32_t
mndStreamActionUpdate
(
SSdb
*
pSdb
,
SStreamObj
*
pStream
,
SStreamObj
*
pNewStream
);
static
int32_t
mndProcessCreateStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessRecoverStreamReq
(
SRpcMsg
*
pReq
);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static
int32_t
mndProcessStreamMetaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndGetStreamMeta
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveStream
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
...
...
@@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_STREAM
,
mndProcessCreateStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_STREAM
,
mndProcessDropStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_RECOVER_STREAM
,
mndProcessRecoverStreamReq
);
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DROP_RSP
,
mndTransProcessRsp
);
...
...
@@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return
0
;
}
#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);
...
...
@@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
}
return 0;
}
#endif
int32_t
mndDropStreamTasks
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
int32_t
lv
=
taosArrayGetSize
(
pStream
->
tasks
);
...
...
@@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
#if 0
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
...
...
@@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
#endif
int32_t
mndDropStreamByDb
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
75297e3a
...
...
@@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
TSDBROW
row
=
tsdbRowFromBlockData
(
&
pReader
->
oBlockData
,
iRow
);
int64_t
version
=
TSDBROW_VERSION
(
&
row
);
tsdbTrace
(
"vgId:%d, vnode snapshot tsdb read for %s, %"
PRId64
"(%"
PRId64
" , %"
PRId64
")"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
pReader
->
pTsdb
->
path
,
version
,
pReader
->
sver
,
pReader
->
ever
);
if
(
version
<
pReader
->
sver
||
version
>
pReader
->
ever
)
continue
;
code
=
tBlockDataAppendRow
(
&
pReader
->
nBlockData
,
&
row
,
NULL
);
if
(
code
)
goto
_err
;
}
if
(
pReader
->
nBlockData
.
nRow
<=
0
)
{
continue
;
}
// org data
// compress data (todo)
int32_t
size
=
sizeof
(
TABLEID
)
+
tPutBlockData
(
NULL
,
&
pReader
->
nBlockData
);
...
...
@@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if
(
code
)
goto
_err
;
_exit:
tsdbDebug
(
"vgId:%d, vnode snapshot tsdb write data impl for %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pWriter
->
pTsdb
->
path
);
tsdbDebug
(
"vgId:%d, vnode snapshot tsdb write data impl for %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pWriter
->
pTsdb
->
path
);
return
code
;
_err:
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
75297e3a
...
...
@@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool
vnodeIsLeader
(
SVnode
*
pVnode
)
{
if
(
!
syncIsReady
(
pVnode
->
sync
))
{
vDebug
(
"vgId:%d, vnode not ready"
,
pVnode
->
config
.
vgId
);
return
false
;
}
if
(
!
pVnode
->
restored
)
{
vDebug
(
"vgId:%d, vnode not restored"
,
pVnode
->
config
.
vgId
);
terrno
=
TSDB_CODE_APP_NOT_READY
;
return
false
;
}
...
...
source/libs/stream/CMakeLists.txt
浏览文件 @
75297e3a
...
...
@@ -8,7 +8,8 @@ target_include_directories(
target_link_libraries
(
stream
PRIVATE os util transport qcom executor tdb
PUBLIC tdb
PRIVATE os util transport qcom executor
)
if
(
${
BUILD_TEST
}
)
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
75297e3a
...
...
@@ -14,22 +14,8 @@
*/
#include "executor.h"
#include "tdbInt.h"
#include "tstream.h"
typedef
int32_t
FTaskExpand
(
void
*
ahandle
,
SStreamTask
*
pTask
);
typedef
struct
SStreamMeta
{
char
*
path
;
TDB
*
db
;
TTB
*
pTaskDb
;
TTB
*
pStateDb
;
SHashObj
*
pTasks
;
void
*
ahandle
;
TXN
txn
;
FTaskExpand
*
expandFunc
;
}
SStreamMeta
;
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
)
{
SStreamMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamMeta
));
if
(
pMeta
==
NULL
)
{
...
...
@@ -150,7 +136,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
return
0
;
}
int32_t
stream
RestoreTask
(
SStreamMeta
*
pMeta
)
{
int32_t
stream
LoadTasks
(
SStreamMeta
*
pMeta
)
{
TBC
*
pCur
=
NULL
;
if
(
tdbTbcOpen
(
pMeta
->
pTaskDb
,
&
pCur
,
NULL
)
<
0
)
{
ASSERT
(
0
);
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
75297e3a
...
...
@@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
return
0
;
}
int32_t
streamProcessFailRecoverReq
(
SStreamTask
*
pTask
,
SMStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
#if 0
if (pTask->taskStatus != TASK_STATUS__FAIL) {
return 0;
typedef
struct
{
int32_t
vgId
;
int32_t
childId
;
int64_t
ver
;
}
SStreamVgVerCheckpoint
;
int32_t
tEncodeSStreamVgVerCheckpoint
(
SEncoder
*
pEncoder
,
const
SStreamVgVerCheckpoint
*
pCheckpoint
)
{
if
(
tEncodeI32
(
pEncoder
,
pCheckpoint
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pCheckpoint
->
childId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pCheckpoint
->
ver
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tDecodeSStreamVgVerCheckpoint
(
SDecoder
*
pDecoder
,
SStreamVgVerCheckpoint
*
pCheckpoint
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pCheckpoint
->
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pCheckpoint
->
childId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pCheckpoint
->
ver
)
<
0
)
return
-
1
;
return
0
;
}
typedef
struct
{
int64_t
streamId
;
int64_t
checkTs
;
int64_t
checkpointId
;
int32_t
taskId
;
SArray
*
checkpointVer
;
// SArray<SStreamVgCheckpointVer>
}
SStreamAggVerCheckpoint
;
int32_t
tEncodeSStreamAggVerCheckpoint
(
SEncoder
*
pEncoder
,
const
SStreamAggVerCheckpoint
*
pCheckpoint
)
{
if
(
tEncodeI64
(
pEncoder
,
pCheckpoint
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pCheckpoint
->
checkTs
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pCheckpoint
->
checkpointId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pCheckpoint
->
taskId
)
<
0
)
return
-
1
;
int32_t
sz
=
taosArrayGetSize
(
pCheckpoint
->
checkpointVer
);
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SStreamVgVerCheckpoint
*
pOneVgCkpoint
=
taosArrayGet
(
pCheckpoint
->
checkpointVer
,
i
);
if
(
tEncodeSStreamVgVerCheckpoint
(
pEncoder
,
pOneVgCkpoint
)
<
0
)
return
-
1
;
}
return
0
;
}
if (pTask->isStreamDistributed) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
} else if (pTask->taskType != TASK_TYPE__SINK) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
bool hasCheckpoint = false;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
for (int32_t i = 0; i < childSz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
if (pEpInfo->checkpointVer == -1) {
hasCheckpoint = true;
break;
}
}
if (hasCheckpoint) {
// load from checkpoint
} else {
// recover child
}
}
} else {
if (pTask->taskType == TASK_TYPE__SOURCE) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
// reset stream query task info
// TODO get snapshot ver
pTask->recoverSnapVer = -1;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
pTask->taskStatus = TASK_STATUS__RECOVERING;
}
}
int32_t
tDecodeSStreamAggVerCheckpoint
(
SDecoder
*
pDecoder
,
SStreamAggVerCheckpoint
*
pCheckpoint
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pCheckpoint
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pCheckpoint
->
checkTs
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pCheckpoint
->
checkpointId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pCheckpoint
->
taskId
)
<
0
)
return
-
1
;
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SStreamVgVerCheckpoint
oneVgCheckpoint
;
if
(
tDecodeSStreamVgVerCheckpoint
(
pDecoder
,
&
oneVgCheckpoint
)
<
0
)
return
-
1
;
taosArrayPush
(
pCheckpoint
->
checkpointVer
,
&
oneVgCheckpoint
);
}
return
0
;
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
if (streamPipelineExec(pTask, 100) < 0) {
// set fail
return -1;
}
int32_t
streamRecoverSinkLevel
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
);
// load status
void
*
pVal
=
NULL
;
int32_t
vLen
=
0
;
if
(
tdbTbGet
(
pMeta
->
pStateDb
,
&
pTask
->
taskId
,
sizeof
(
void
*
),
&
pVal
,
&
vLen
)
<
0
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
pVal
,
vLen
);
SStreamAggVerCheckpoint
aggCheckpoint
;
tDecodeSStreamAggVerCheckpoint
(
&
decoder
,
&
aggCheckpoint
);
/*pTask->*/
return
0
;
}
#endif
int32_t
streamRecoverTask
(
SStreamTask
*
pTask
)
{
//
return
0
;
}
source/libs/stream/src/streamTask.c
浏览文件 @
75297e3a
...
...
@@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
totalLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
taskLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
outputType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
...
...
@@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
/*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/
int32_t
epSz
=
taosArrayGetSize
(
pTask
->
childEpInfo
);
if
(
tEncodeI32
(
pEncoder
,
epSz
)
<
0
)
return
-
1
;
...
...
@@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
totalLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
taskLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
outputType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
...
...
@@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
/*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/
int32_t
epSz
;
if
(
tDecodeI32
(
pDecoder
,
&
epSz
)
<
0
)
return
-
1
;
...
...
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
75297e3a
...
...
@@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncIndex
logStoreFirstIndex
(
SSyncLogStore
*
pLogStore
);
SyncIndex
logStoreWalCommitVer
(
SSyncLogStore
*
pLogStore
);
// for debug
void
logStorePrint
(
SSyncLogStore
*
pLogStore
);
void
logStorePrint2
(
char
*
s
,
SSyncLogStore
*
pLogStore
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
75297e3a
...
...
@@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"log truncate, from %"
PRId64
" to %"
PRId64
,
delBegin
,
delEnd
);
syncNodeEventLog
(
ths
,
eventLog
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
return
code
;
}
// if FromIndex > walCommitVer, return 0
// else return num of pass entries
static
int32_t
syncNodeDoMakeLogSame
(
SSyncNode
*
ths
,
SyncIndex
FromIndex
)
{
int32_t
code
;
int32_t
code
=
0
;
int32_t
pass
=
0
;
SyncIndex
delBegin
=
FromIndex
;
SyncIndex
delEnd
=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
);
...
...
@@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
}
}
// update delete begin
SyncIndex
walCommitVer
=
logStoreWalCommitVer
(
ths
->
pLogStore
);
if
(
delBegin
<=
walCommitVer
)
{
delBegin
=
walCommitVer
+
1
;
pass
=
walCommitVer
-
delBegin
+
1
;
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"update delete begin to %ld"
,
delBegin
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"log truncate, from %"
PRId64
" to %"
PRId64
,
delBegin
,
delEnd
);
syncNodeEventLog
(
ths
,
eventLog
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"make log same from:%ld, delbegin:%ld, pass:%d"
,
FromIndex
,
delBegin
,
pass
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
return
code
;
return
pass
;
}
int32_t
syncNodePreCommit
(
SSyncNode
*
ths
,
SSyncRaftEntry
*
pEntry
,
int32_t
code
)
{
...
...
@@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SOffsetAndContLen
*
metaTableArr
=
syncAppendEntriesBatchMetaTableArray
(
pMsg
);
if
(
hasAppendEntries
&&
pMsg
->
prevLogIndex
==
ths
->
commitIndex
)
{
// make log same
do
{
SyncIndex
logLastIndex
=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
);
bool
hasExtraEntries
=
logLastIndex
>
pMsg
->
prevLogIndex
;
int32_t
pass
=
0
;
SyncIndex
logLastIndex
=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
);
bool
hasExtraEntries
=
logLastIndex
>
pMsg
->
prevLogIndex
;
if
(
hasExtraEntries
)
{
// make log same, rollback deleted entries
code
=
syncNodeDoMakeLogSame
(
ths
,
pMsg
->
prevLogIndex
+
1
);
ASSERT
(
code
==
0
);
}
}
while
(
0
);
// make log same
if
(
hasExtraEntries
)
{
// make log same, rollback deleted entries
pass
=
syncNodeDoMakeLogSame
(
ths
,
pMsg
->
prevLogIndex
+
1
);
ASSERT
(
pass
>=
0
);
}
// append entry batch
for
(
int32_t
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
SSyncRaftEntry
*
pAppendEntry
=
(
SSyncRaftEntry
*
)(
pMsg
->
data
+
metaTableArr
[
i
].
offset
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
return
-
1
;
}
if
(
pass
==
0
)
{
// assert! no batch
ASSERT
(
pMsg
->
dataCount
<=
1
);
for
(
int32_t
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
SSyncRaftEntry
*
pAppendEntry
=
(
SSyncRaftEntry
*
)(
pMsg
->
data
+
metaTableArr
[
i
].
offset
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
return
-
1
;
}
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
,
0
);
ASSERT
(
code
==
0
);
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
,
0
);
ASSERT
(
code
==
0
);
// syncEntryDestory(pAppendEntry);
// syncEntryDestory(pAppendEntry);
}
}
// fsync once
...
...
@@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncLogRecvAppendEntriesBatch
(
ths
,
pMsg
,
"really match"
);
int32_t
pass
=
0
;
if
(
hasExtraEntries
)
{
// make log same, rollback deleted entries
code
=
syncNodeDoMakeLogSame
(
ths
,
pMsg
->
prevLogIndex
+
1
);
ASSERT
(
code
=
=
0
);
pass
=
syncNodeDoMakeLogSame
(
ths
,
pMsg
->
prevLogIndex
+
1
);
ASSERT
(
pass
>
=
0
);
}
if
(
hasAppendEntries
)
{
// append entry batch
for
(
int32_t
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
SSyncRaftEntry
*
pAppendEntry
=
(
SSyncRaftEntry
*
)(
pMsg
->
data
+
metaTableArr
[
i
].
offset
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
return
-
1
;
}
if
(
pass
==
0
)
{
// assert! no batch
ASSERT
(
pMsg
->
dataCount
<=
1
);
// append entry batch
for
(
int32_t
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
SSyncRaftEntry
*
pAppendEntry
=
(
SSyncRaftEntry
*
)(
pMsg
->
data
+
metaTableArr
[
i
].
offset
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
return
-
1
;
}
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
,
0
);
ASSERT
(
code
==
0
);
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
,
0
);
ASSERT
(
code
==
0
);
// syncEntryDestory(pAppendEntry);
// syncEntryDestory(pAppendEntry);
}
}
// fsync once
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
75297e3a
...
...
@@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
// advance commit index as large as possible
SyncIndex
walCommitVer
=
logStoreWalCommitVer
(
pSyncNode
->
pLogStore
);
if
(
walCommitVer
>
newCommitIndex
)
{
newCommitIndex
=
walCommitVer
;
}
// maybe execute fsm
if
(
newCommitIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
beginIndex
=
pSyncNode
->
commitIndex
+
1
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
75297e3a
...
...
@@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static
void
syncNodeEqHeartbeatTimer
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
syncNodeEventLog
(
pSyncNode
,
"eq hb timer"
);
if
(
pSyncNode
->
replicaNum
>
1
)
{
if
(
atomic_load_64
(
&
pSyncNode
->
heartbeatTimerLogicClockUser
)
<=
atomic_load_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
))
{
...
...
@@ -2665,7 +2668,22 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
syncNodeEventLog
(
ths
,
"I am not follower, can not do leader transfer"
);
return
0
;
}
syncNodeEventLog
(
ths
,
"do leader transfer"
);
if
(
!
ths
->
restoreFinish
)
{
syncNodeEventLog
(
ths
,
"restore not finish, can not do leader transfer"
);
return
0
;
}
if
(
ths
->
vgId
>
1
)
{
syncNodeEventLog
(
ths
,
"I am vnode, can not do leader transfer"
);
return
0
;
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"do leader transfer, index:%ld"
,
pEntry
->
index
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
bool
sameId
=
syncUtilSameId
(
&
(
pSyncLeaderTransfer
->
newLeaderId
),
&
(
ths
->
myRaftId
));
bool
sameNodeInfo
=
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
75297e3a
...
...
@@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return
code
;
}
// truncate semantic
static
int32_t
raftLogTruncate
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
fromIndex
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
int32_t
code
=
walRollback
(
pWal
,
fromIndex
);
// need not truncate
SyncIndex
wallastVer
=
walGetLastVer
(
pWal
);
if
(
fromIndex
>
wallastVer
)
{
return
0
;
}
int32_t
code
=
walRollback
(
pWal
,
fromIndex
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
...
...
@@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"
wal
truncate, from-index:%"
PRId64
,
fromIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"
log
truncate, from-index:%"
PRId64
,
fromIndex
);
syncNodeEventLog
(
pData
->
pSyncNode
,
logBuf
);
}
while
(
0
);
...
...
@@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
return
walGetFirstVer
(
pWal
);
}
SyncIndex
logStoreWalCommitVer
(
SSyncLogStore
*
pLogStore
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
return
walGetCommittedVer
(
pWal
);
}
// for debug -----------------
void
logStorePrint
(
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStore2Str
(
pLogStore
);
...
...
source/libs/transport/src/.transSvr.c.swo
已删除
100644 → 0
浏览文件 @
6891c40c
文件已删除
source/libs/transport/src/transCli.c
浏览文件 @
75297e3a
...
...
@@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb
SCliConn
*
conn
=
handle
->
data
;
tTrace
(
"%s conn %p timeout, ref:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
T_REF_VAL_GET
(
conn
));
uv_read_stop
(
conn
->
stream
);
cliHandleExceptImpl
(
conn
,
TSDB_CODE_RPC_TIMEOUT
);
}
...
...
@@ -993,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) {
if
(
count
>=
2
)
{
tTrace
(
"cli process batch size:%d"
,
count
);
}
// if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb);
if
(
pThrd
->
stopMsg
!=
NULL
)
cliHandleQuit
(
pThrd
->
stopMsg
,
pThrd
);
}
static
void
cliPrepareCb
(
uv_prepare_t
*
handle
)
{
...
...
@@ -1088,7 +1091,7 @@ static SCliThrd* createThrdObj() {
pThrd
->
prepare
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_prepare_t
));
uv_prepare_init
(
pThrd
->
loop
,
pThrd
->
prepare
);
pThrd
->
prepare
->
data
=
pThrd
;
uv_prepare_start
(
pThrd
->
prepare
,
cliPrepareCb
);
//
uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t
timerSize
=
512
;
pThrd
->
timerList
=
taosArrayInit
(
timerSize
,
sizeof
(
void
*
));
...
...
@@ -1125,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree
(
timer
);
}
taosArrayDestroy
(
pThrd
->
timerList
);
taosMemoryFree
(
pThrd
->
prepare
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录