Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b6bddd3f
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b6bddd3f
编写于
6月 15, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into szhou/feature/multi-tb-merge-scan
上级
040093be
fd9bb8db
变更
35
显示空白变更内容
内联
并排
Showing
35 changed file
with
425 addition
and
143 deletion
+425
-143
examples/c/stream_demo.c
examples/c/stream_demo.c
+2
-1
include/libs/executor/executor.h
include/libs/executor/executor.h
+6
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+31
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-9
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+84
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-2
source/dnode/mnode/impl/inc/mndStream.h
source/dnode/mnode/impl/inc/mndStream.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+4
-4
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+10
-5
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+3
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+3
-1
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+8
-9
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+9
-9
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+13
-8
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+6
-3
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+8
-19
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+13
-7
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+2
-2
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+18
-0
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+7
-0
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+67
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+8
-6
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+86
-39
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+1
-1
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+1
-1
source/libs/sync/test/syncReplicateTest.cpp
source/libs/sync/test/syncReplicateTest.cpp
+1
-1
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+1
-1
source/util/src/tarray.c
source/util/src/tarray.c
+5
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-0
tests/system-test/7-tmq/basic5.py
tests/system-test/7-tmq/basic5.py
+3
-1
tests/system-test/test-all.bat
tests/system-test/test-all.bat
+3
-2
tests/system-test/test.py
tests/system-test/test.py
+11
-4
未找到文件。
examples/c/stream_demo.c
浏览文件 @
b6bddd3f
...
...
@@ -82,7 +82,8 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"
);
pConn
,
"create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 interval(10m)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/libs/executor/executor.h
浏览文件 @
b6bddd3f
...
...
@@ -38,8 +38,10 @@ typedef struct SReadHandle {
SMsgCb
*
pMsgCb
;
}
SReadHandle
;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
enum
{
STREAM_DATA_TYPE_SUBMIT_BLOCK
=
1
,
STREAM_DATA_TYPE_SSDATA_BLOCK
=
2
,
};
typedef
enum
{
OPTR_EXEC_MODEL_BATCH
=
0x1
,
...
...
@@ -102,7 +104,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param tversion
* @return
*/
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
);
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
include/libs/stream/tstream.h
浏览文件 @
b6bddd3f
...
...
@@ -58,6 +58,7 @@ enum {
enum
{
STREAM_INPUT__DATA_SUBMIT
=
1
,
STREAM_INPUT__DATA_BLOCK
,
STREAM_INPUT__TRIGGER
,
STREAM_INPUT__CHECKPOINT
,
};
...
...
@@ -85,6 +86,11 @@ typedef struct {
int8_t
type
;
}
SStreamCheckpoint
;
typedef
struct
{
int8_t
type
;
SSDataBlock
*
pBlock
;
}
SStreamTrigger
;
enum
{
STREAM_QUEUE__SUCESS
=
1
,
STREAM_QUEUE__FAILED
,
...
...
@@ -98,6 +104,9 @@ typedef struct {
int8_t
status
;
}
SStreamQueue
;
int32_t
streamInit
();
void
streamCleanUp
();
SStreamQueue
*
streamQueueOpen
();
void
streamQueueClose
(
SStreamQueue
*
queue
);
...
...
@@ -220,6 +229,11 @@ enum {
TASK_INPUT_TYPE__DATA_BLOCK
,
};
enum
{
TASK_TRIGGER_STATUS__IN_ACTIVE
=
1
,
TASK_TRIGGER_STATUS__ACTIVE
,
};
struct
SStreamTask
{
int64_t
streamId
;
int32_t
taskId
;
...
...
@@ -262,8 +276,16 @@ struct SStreamTask {
SStreamQueue
*
inputQueue
;
SStreamQueue
*
outputQueue
;
// trigger
int8_t
triggerStatus
;
int64_t
triggerParam
;
void
*
timer
;
// application storage
// void* ahandle;
// msg handle
SMsgCb
*
pMsgCb
;
};
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
...
...
@@ -292,6 +314,13 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__TRIGGER
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
if
(
pItem
->
type
!=
STREAM_INPUT__TRIGGER
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
&&
pTask
->
triggerStatus
==
TASK_TRIGGER_STATUS__IN_ACTIVE
)
{
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
// TODO: back pressure
...
...
@@ -370,7 +399,8 @@ typedef struct {
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
);
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
...
...
include/libs/sync/sync.h
浏览文件 @
b6bddd3f
...
...
@@ -44,14 +44,6 @@ typedef enum {
TAOS_SYNC_STATE_ERROR
=
103
,
}
ESyncState
;
typedef
enum
{
TAOS_SYNC_PROPOSE_SUCCESS
=
0
,
TAOS_SYNC_PROPOSE_NOT_LEADER
=
1
,
TAOS_SYNC_ONLY_ONE_REPLICA
=
2
,
TAOS_SYNC_NOT_IN_NEW_CONFIG
=
3
,
TAOS_SYNC_OTHER_ERROR
=
100
,
}
ESyncProposeCode
;
typedef
enum
{
TAOS_SYNC_FSM_CB_SUCCESS
=
0
,
TAOS_SYNC_FSM_CB_OTHER_ERROR
=
1
,
...
...
include/util/taoserror.h
浏览文件 @
b6bddd3f
...
...
@@ -411,6 +411,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
...
...
source/client/src/tmq.c
浏览文件 @
b6bddd3f
...
...
@@ -992,6 +992,90 @@ CREATE_MSG_FAIL:
return
-
1
;
}
bool
tmqUpdateEp2
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqAskEpRsp
*
pRsp
)
{
bool
set
=
false
;
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
SArray
*
newTopics
=
taosArrayInit
(
topicNumGet
,
sizeof
(
SMqClientTopic
));
if
(
newTopics
==
NULL
)
{
return
false
;
}
SHashObj
*
pHash
=
taosHashInit
(
64
,
MurmurHash3_32
,
false
,
HASH_NO_LOCK
);
if
(
pHash
==
NULL
)
{
taosArrayDestroy
(
newTopics
);
return
false
;
}
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
for
(
int32_t
i
=
0
;
i
<
topicNumCur
;
i
++
)
{
// find old topic
SMqClientTopic
*
pTopicCur
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
if
(
pTopicCur
->
vgs
)
{
int32_t
vgNumCur
=
taosArrayGetSize
(
pTopicCur
->
vgs
);
tscDebug
(
"consumer %ld new vg num: %d"
,
tmq
->
consumerId
,
vgNumCur
);
if
(
vgNumCur
==
0
)
break
;
for
(
int32_t
j
=
0
;
j
<
vgNumCur
;
j
++
)
{
SMqClientVg
*
pVgCur
=
taosArrayGet
(
pTopicCur
->
vgs
,
j
);
sprintf
(
vgKey
,
"%s:%d"
,
pTopicCur
->
topicName
,
pVgCur
->
vgId
);
tscDebug
(
"consumer %ld epoch %d vg %d build %s"
,
tmq
->
consumerId
,
epoch
,
pVgCur
->
vgId
,
vgKey
);
taosHashPut
(
pHash
,
vgKey
,
strlen
(
vgKey
),
&
pVgCur
->
currentOffset
,
sizeof
(
int64_t
));
}
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
topicNumGet
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
pRsp
->
topics
,
i
);
topic
.
schema
=
pTopicEp
->
schema
;
taosHashClear
(
pHash
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
tstrncpy
(
topic
.
db
,
pTopicEp
->
db
,
TSDB_DB_FNAME_LEN
);
tscDebug
(
"consumer %ld update topic: %s"
,
tmq
->
consumerId
,
topic
.
topicName
);
int32_t
vgNumGet
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgNumGet
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgNumGet
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
sprintf
(
vgKey
,
"%s:%d"
,
topic
.
topicName
,
pVgEp
->
vgId
);
int64_t
*
pOffset
=
taosHashGet
(
pHash
,
vgKey
,
strlen
(
vgKey
));
int64_t
offset
=
tmq
->
resetOffsetCfg
;
if
(
pOffset
!=
NULL
)
{
offset
=
*
pOffset
;
}
tscDebug
(
"consumer %ld(epoch %d) offset of vg %d updated to %ld"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
offset
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
currentOffset
=
offset
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
,
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
.
vgSkipCnt
=
0
,
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
newTopics
,
&
topic
);
}
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
taosHashCleanup
(
pHash
);
tmq
->
clientTopics
=
newTopics
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__NO_TOPIC
);
else
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
return
set
;
}
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqAskEpRsp
*
pRsp
)
{
/*printf("call update ep %d\n", epoch);*/
bool
set
=
false
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
b6bddd3f
...
...
@@ -554,8 +554,8 @@ typedef struct {
SVgObj
fixedSinkVg
;
int64_t
smaId
;
// 0 for unused
int8_t
trigger
;
int
32
_t
triggerParam
;
int64_t
water
M
ark
;
int
64
_t
triggerParam
;
int64_t
water
m
ark
;
char
*
sql
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
...
...
source/dnode/mnode/impl/inc/mndStream.h
浏览文件 @
b6bddd3f
...
...
@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw
*
mndStreamActionEncode
(
SStreamObj
*
pStream
);
SSdbRow
*
mndStreamActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
b6bddd3f
...
...
@@ -33,8 +33,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
water
M
ark
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
water
m
ark
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
...
...
@@ -85,8 +85,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
water
M
ark
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
water
m
ark
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
b6bddd3f
...
...
@@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) {
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
TAOS_SYNC_OTHER_ERROR
;
int32_t
code
=
0
;
if
(
!
syncEnvIsStart
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
char
logBuf
[
512
]
=
{
0
};
...
...
@@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp
(
&
rsp
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_OTHER_ERROR
;
code
=
-
1
;
}
}
else
{
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
...
...
@@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp
(
&
rsp
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_OTHER_ERROR
;
code
=
-
1
;
}
}
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
b6bddd3f
...
...
@@ -387,6 +387,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// input
pFinalTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// trigger
pFinalTask
->
triggerParam
=
pStream
->
triggerParam
;
// dispatch
if
(
mndAddDispatcherToInnerTask
(
pMnode
,
pTrans
,
pStream
,
pFinalTask
)
<
0
)
{
qDestroyQueryPlan
(
pPlan
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
b6bddd3f
...
...
@@ -561,6 +561,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj
.
sql
=
pCreate
->
sql
;
streamObj
.
createdBy
=
STREAM_CREATED_BY__SMA
;
streamObj
.
smaId
=
smaObj
.
uid
;
streamObj
.
watermark
=
0
;
streamObj
.
trigger
=
STREAM_TRIGGER_AT_ONCE
;
if
(
mndAllocSmaVgroup
(
pMnode
,
pDb
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
{
mError
(
"sma:%s, failed to create since %s"
,
smaObj
.
name
,
terrstr
());
...
...
@@ -583,7 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if
(
mndSetUpdateSmaStbCommitLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if
(
mndSetCreateSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
streamObj
.
fixedSinkVg
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
STREAM_TRIGGER_AT_ONCE
,
0
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
b6bddd3f
...
...
@@ -235,16 +235,15 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesNodeToString
((
SNode
*
)
pPlan
,
false
,
pStr
,
NULL
);
code
=
nodesNodeToString
((
SNode
*
)
pPlan
,
false
,
pStr
,
NULL
);
}
nodesDestroyNode
(
pAst
);
nodesDestroyNode
((
SNode
*
)
pPlan
);
nodesDestroyNode
((
SNode
*
)
pPlan
);
terrno
=
code
;
return
code
;
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
...
...
@@ -258,7 +257,6 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
// free
nodesDestroyNode
(
pAst
);
#if 0
printf("|");
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
...
...
@@ -268,7 +266,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
#endif
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
triggerType
,
watermark
,
&
pStream
->
physicalPlan
))
{
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
pStream
->
trigger
,
pStream
->
watermark
,
&
pStream
->
physicalPlan
))
{
mError
(
"topic:%s, failed to get plan since %s"
,
pStream
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -391,7 +389,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj
.
smaId
=
0
;
/*streamObj.physicalPlan = "";*/
streamObj
.
trigger
=
pCreate
->
triggerType
;
streamObj
.
waterMark
=
pCreate
->
watermark
;
streamObj
.
watermark
=
pCreate
->
watermark
;
streamObj
.
triggerParam
=
pCreate
->
maxDelay
;
if
(
streamObj
.
targetSTbName
[
0
])
{
pDb
=
mndAcquireDbByStb
(
pMnode
,
streamObj
.
targetSTbName
);
...
...
@@ -409,7 +408,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
p
Create
->
triggerType
,
pCreate
->
watermark
,
p
Trans
)
!=
0
)
{
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to add stream since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -566,7 +565,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
targetSTbName
,
true
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
water
M
ark
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
water
m
ark
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
trigger
,
false
);
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
b6bddd3f
...
...
@@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
rsp
,
isWeak
);
if
(
code
==
0
)
{
tsem_wait
(
&
pMgmt
->
syncSem
);
}
else
if
(
code
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
else
if
(
code
==
TAOS_SYNC_OTHER
_ERROR
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_INTERNAL
_ERROR
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
else
{
terrno
=
TSDB_CODE_APP_ERROR
;
...
...
@@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) {
syncStart
(
pMgmt
->
sync
);
mDebug
(
"mnode sync started, id:%"
PRId64
" standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
/*
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
*/
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
b6bddd3f
...
...
@@ -375,6 +375,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
pTask
->
inputQueue
==
NULL
||
pTask
->
outputQueue
==
NULL
)
goto
FAIL
;
pTask
->
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
// expand runners
...
...
@@ -406,9 +408,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
}
streamSetupTrigger
(
pTask
);
tqInfo
(
"deploy stream task id %d child id %d on vg %d"
,
pTask
->
taskId
,
pTask
->
childId
,
pTq
->
pVnode
->
config
.
vgId
);
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
));
return
0
;
FAIL:
...
...
@@ -431,7 +436,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
SStreamTask
*
pTask
=
*
(
SStreamTask
*
*
)
pIter
;
if
(
pTask
->
inputType
!=
STREAM_INPUT__DATA_SUBMIT
)
continue
;
if
(
!
failed
)
{
...
...
@@ -439,7 +444,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
continue
;
}
if
(
stream
Trigger
ByWrite
(
pTask
,
pTq
->
pVnode
->
config
.
vgId
,
&
pTq
->
pVnode
->
msgCb
)
<
0
)
{
if
(
stream
Launch
ByWrite
(
pTask
,
pTq
->
pVnode
->
config
.
vgId
,
&
pTq
->
pVnode
->
msgCb
)
<
0
)
{
continue
;
}
}
else
{
...
...
@@ -459,7 +464,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
//
SStreamTaskRunReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamTaskProcessRunReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
);
return
0
;
}
...
...
@@ -473,7 +478,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderInit
(
&
decoder
,
msgBody
,
msgLen
);
tDecodeStreamDispatchReq
(
&
decoder
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SRpcMsg
rsp
=
{
.
info
=
pMsg
->
info
,
.
code
=
0
,
...
...
@@ -485,7 +490,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamTaskRecoverReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
}
...
...
@@ -493,7 +498,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamDispatchRsp
*
pRsp
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
return
0
;
}
...
...
@@ -501,7 +506,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessRecoverRsp
(
pTask
,
pRsp
);
return
0
;
}
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
b6bddd3f
...
...
@@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
}
int32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
ret
=
TAOS_SYNC_OTHER_ERROR
;
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
...
...
@@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tmsgSendRsp
(
&
rsp
);
}
else
{
vError
(
"==vnodeProcessSyncReq== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
TAOS_SYNC_OTHER_ERROR
;
ret
=
-
1
;
}
syncNodeRelease
(
pSyncNode
);
}
else
{
vError
(
"==vnodeProcessSyncReq== error syncEnv stop"
);
ret
=
TAOS_SYNC_OTHER_ERROR
;
ret
=
-
1
;
}
if
(
ret
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
ret
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
b6bddd3f
...
...
@@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if
(
code
==
0
)
{
vnodeAccumBlockMsg
(
pVnode
,
pMsg
->
msgType
);
}
else
if
(
code
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
SEpSet
newEpSet
=
{
0
};
syncGetEpSet
(
pVnode
->
sync
,
&
newEpSet
);
SEp
*
pEp
=
&
newEpSet
.
eps
[
newEpSet
.
inUse
];
...
...
@@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
vnodeSnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
vnodeSnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
static
SSyncFSM
*
vnodeSyncMakeFsm
(
SVnode
*
pVnode
)
{
SSyncFSM
*
pFsm
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncFSM
));
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
b6bddd3f
...
...
@@ -1899,7 +1899,7 @@ _return:
void
ctgUpdateThreadUnexpectedStopped
(
void
)
{
if
(
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
)
>
0
)
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
if
(
!
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
)
&&
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
)
>
0
)
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
}
void
ctgCleanupCacheQueue
(
void
)
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
b6bddd3f
...
...
@@ -19,7 +19,8 @@
#include "tdatablock.h"
#include "vnode.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
char
*
id
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
...
...
@@ -43,6 +44,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if
(
pInfo
->
blockType
==
0
)
{
pInfo
->
blockType
=
type
;
}
else
if
(
pInfo
->
blockType
!=
type
)
{
ASSERT
(
0
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
@@ -51,7 +53,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
else
{
}
else
if
(
type
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
for
(
int32_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
&
((
SSDataBlock
*
)
input
)[
i
];
...
...
@@ -62,6 +64,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
}
else
{
ASSERT
(
0
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -83,7 +87,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
assignUid
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
assignUid
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
...
...
@@ -178,9 +183,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return
code
;
}
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
ASSERT
(
tinfo
!=
NULL
&&
dbName
!=
NULL
&&
tableName
!=
NULL
);
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
*
sversion
=
pTaskInfo
->
schemaVer
.
sversion
;
*
tversion
=
pTaskInfo
->
schemaVer
.
tversion
;
...
...
source/libs/function/src/builtins.c
浏览文件 @
b6bddd3f
...
...
@@ -1682,7 +1682,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"top"
,
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
translateFunc
=
translateTopBot
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
...
...
@@ -1717,7 +1717,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"bottom"
,
.
type
=
FUNCTION_TYPE_BOTTOM
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
translateFunc
=
translateTopBot
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
b6bddd3f
...
...
@@ -2385,6 +2385,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pInputCol
->
info
.
type
;
int32_t
bytes
=
pInputCol
->
info
.
bytes
;
pInfo
->
bytes
=
bytes
;
...
...
@@ -2421,6 +2422,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
>
cts
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
=
cts
;
pInfo
->
hasResult
=
true
;
...
...
@@ -2451,6 +2456,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
>
cts
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
=
cts
;
pInfo
->
hasResult
=
true
;
...
...
@@ -2474,6 +2483,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
type
=
pInputCol
->
info
.
type
;
int32_t
bytes
=
pInputCol
->
info
.
bytes
;
pInfo
->
bytes
=
bytes
;
...
...
@@ -2501,6 +2511,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
<
cts
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
=
cts
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
...
...
@@ -2520,6 +2534,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
<
cts
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
bytes
=
varDataTLen
(
data
);
pInfo
->
bytes
=
bytes
;
}
memcpy
(
pInfo
->
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
pInfo
->
buf
+
bytes
)
=
cts
;
pInfo
->
hasResult
=
true
;
...
...
source/libs/stream/inc/streamInc.h
浏览文件 @
b6bddd3f
...
...
@@ -23,6 +23,13 @@
extern
"C"
{
#endif
typedef
struct
{
int8_t
inited
;
void
*
timer
;
}
SStreamGlobalEnv
;
static
SStreamGlobalEnv
streamEnv
;
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamDispatchReqToData
(
const
SStreamDispatchReq
*
pReq
,
SStreamDataBlock
*
pData
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
b6bddd3f
...
...
@@ -14,8 +14,74 @@
*/
#include "streamInc.h"
#include "ttimer.h"
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamInit
()
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
&
streamEnv
.
inited
,
0
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
0
)
{
streamEnv
.
timer
=
taosTmrInit
(
10000
,
100
,
10000
,
"STREAM"
);
if
(
streamEnv
.
timer
==
NULL
)
{
atomic_store_8
(
&
streamEnv
.
inited
,
0
);
return
-
1
;
}
atomic_store_8
(
&
streamEnv
.
inited
,
1
);
}
return
0
;
}
void
streamCleanUp
()
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
&
streamEnv
.
inited
,
1
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
1
)
{
taosTmrCleanUp
(
streamEnv
.
timer
);
atomic_store_8
(
&
streamEnv
.
inited
,
0
);
}
}
void
streamTriggerByTimer
(
void
*
param
,
void
*
tmrId
)
{
SStreamTask
*
pTask
=
(
void
*
)
param
;
if
(
atomic_load_8
(
&
pTask
->
triggerStatus
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
SStreamTrigger
*
trigger
=
taosAllocateQitem
(
sizeof
(
SStreamTrigger
),
DEF_QITEM
);
if
(
trigger
==
NULL
)
return
;
trigger
->
type
=
STREAM_INPUT__TRIGGER
;
trigger
->
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
trigger
->
pBlock
==
NULL
)
{
taosFreeQitem
(
trigger
);
return
;
}
trigger
->
pBlock
->
info
.
type
=
STREAM_GET_ALL
;
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
trigger
);
streamLaunchByWrite
(
pTask
,
pTask
->
nodeId
,
pTask
->
pMsgCb
);
}
taosTmrReset
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
,
&
pTask
->
timer
);
}
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
)
{
if
(
pTask
->
triggerParam
!=
0
)
{
if
(
streamInit
()
<
0
)
{
return
-
1
;
}
pTask
->
timer
=
taosTmrStart
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
);
pTask
->
triggerStatus
=
TASK_TRIGGER_STATUS__IN_ACTIVE
;
}
return
0
;
}
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
)
{
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
b6bddd3f
...
...
@@ -20,15 +20,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
if
(
pItem
->
type
==
STREAM_INPUT__TRIGGER
)
{
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
ASSERT
(
pSubmit
->
type
==
STREAM_INPUT__DATA_SUBMIT
);
ASSERT
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
false
);
}
else
if
(
p
Task
->
inputT
ype
==
STREAM_INPUT__DATA_BLOCK
)
{
}
else
if
(
p
Item
->
t
ype
==
STREAM_INPUT__DATA_BLOCK
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
ASSERT
(
pTask
->
inputType
==
STREAM_INPUT__DATA_BLOCK
);
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
b6bddd3f
...
...
@@ -72,6 +72,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
}
if
(
tEncodeI64
(
pEncoder
,
pTask
->
triggerParam
)
<
0
)
return
-
1
;
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
...
...
@@ -121,6 +122,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
triggerParam
)
<
0
)
return
-
1
;
/*tEndDecode(pDecoder);*/
return
0
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
b6bddd3f
...
...
@@ -149,12 +149,14 @@ void syncStop(int64_t rid) {
int32_t
syncSetStandby
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
// state change
...
...
@@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) {
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
...
...
@@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if
(
!
IamInNew
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_NOT_IN_NEW_CONFIG
;
terrno
=
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
;
return
-
1
;
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
...
...
@@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
...
...
@@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if
(
!
IamInNew
)
{
sError
(
"sync reconfig error, not in new config"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_NOT_IN_NEW_CONFIG
;
terrno
=
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
;
return
-
1
;
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
...
...
@@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
peersNum
==
0
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SNodeInfo
newLeader
=
(
pSyncNode
->
peersNodeInfo
)[
0
];
...
...
@@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
int32_t
syncLeaderTransferTo
(
int64_t
rid
,
SNodeInfo
newLeader
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
...
...
@@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
if
(
pSyncNode
->
replicaNum
==
1
)
{
sError
(
"only one replica, cannot drop leader"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_ONLY_ONE_REPLICA
;
terrno
=
TSDB_CODE_SYN_ONE_REPLICA
;
return
-
1
;
}
SyncLeaderTransfer
*
pMsg
=
syncLeaderTransferBuild
(
pSyncNode
->
vgId
);
...
...
@@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t
syncPropose
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
int32_t
ret
=
0
;
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
...
...
@@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
}
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
int32_t
ret
=
0
;
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
syncClientRequest2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
&&
(
*
pSyncNode
->
FpEqMsg
)(
pSyncNode
->
msgcb
,
&
rpcMsg
)
==
0
)
{
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
ret
=
0
;
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"syncPropose pSyncNode->FpEqMsg is NULL"
);
}
syncClientRequestDestroy
(
pSyncMsg
);
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
sError
(
"syncPropose not leader, %s"
,
syncUtilState2String
(
pSyncNode
->
state
));
ret
=
TAOS_SYNC_PROPOSE_NOT_LEADER
;
}
return
ret
;
...
...
@@ -945,9 +959,13 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
// timer control --------------
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
taosTmrReset
(
pSyncNode
->
FpPingTimerCB
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
atomic_store_64
(
&
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartPingTimer"
);
}
return
ret
;
}
...
...
@@ -961,10 +979,14 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t
syncNodeStartElectTimer
(
SSyncNode
*
pSyncNode
,
int32_t
ms
)
{
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
pSyncNode
->
electTimerMS
=
ms
;
taosTmrReset
(
pSyncNode
->
FpElectTimerCB
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
atomic_store_64
(
&
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartElectTimer"
);
}
return
ret
;
}
...
...
@@ -998,9 +1020,13 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t
syncNodeStartHeartbeatTimer
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
taosTmrReset
(
pSyncNode
->
FpHeartbeatTimerCB
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
atomic_store_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
,
pSyncNode
->
heartbeatTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartHeartbeatTimer"
);
}
return
ret
;
}
...
...
@@ -1720,14 +1746,25 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
syncRpcMsgLog2
((
char
*
)
"==syncNodeEqPingTimer=="
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue ping msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sTrace
(
"syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
if
(
syncEnvIsStart
())
{
taosTmrReset
(
syncNodeEqPingTimer
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqPingTimer"
);
}
}
else
{
sTrace
(
"==syncNodeEqPingTimer== pingTimerLogicClock:%"
PRIu64
", pingTimerLogicClockUser:%"
PRIu64
""
,
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
...
...
@@ -1743,16 +1780,26 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
syncRpcMsgLog2
((
char
*
)
"==syncNodeEqElectTimer=="
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue elect msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sTrace
(
"syncNodeEqElectTimer
pSyncNode->
FpEqMsg is NULL"
);
sTrace
(
"syncNodeEqElectTimer FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
// reset timer ms
if
(
syncEnvIsStart
())
{
pSyncNode
->
electTimerMS
=
syncUtilElectRandomMS
(
pSyncNode
->
electBaseLine
,
2
*
pSyncNode
->
electBaseLine
);
taosTmrReset
(
syncNodeEqElectTimer
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqElectTimer"
);
}
}
else
{
sTrace
(
"==syncNodeEqElectTimer== electTimerLogicClock:%"
PRIu64
", electTimerLogicClockUser:%"
PRIu64
""
,
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
...
...
@@ -1774,19 +1821,19 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue timer msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
s
Trace
(
"syncNodeEqHeartbeatTimer pSyncNode->
FpEqMsg is NULL"
);
s
Error
(
"syncNodeEqHeartbeatTimer
FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
if
(
gSyncEnv
!=
NULL
)
{
if
(
syncEnvIsStart
()
)
{
taosTmrReset
(
syncNodeEqHeartbeatTimer
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
}
else
{
sError
(
"sync env is
already stop
"
);
sError
(
"sync env is
stop, syncNodeEqHeartbeatTimer
"
);
}
}
else
{
sTrace
(
"==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%"
PRIu64
", heartbeatTimerLogicClockUser:%"
PRIu64
...
...
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
b6bddd3f
...
...
@@ -338,7 +338,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
b6bddd3f
...
...
@@ -251,7 +251,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncReplicateTest.cpp
浏览文件 @
b6bddd3f
...
...
@@ -188,7 +188,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
b6bddd3f
...
...
@@ -391,7 +391,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader, leaderTransferWait:%d"
,
simpleStr
,
alreadySend
,
leaderTransferWait
);
}
else
{
assert
(
ret
==
0
);
...
...
source/util/src/tarray.c
浏览文件 @
b6bddd3f
...
...
@@ -174,7 +174,11 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void
*
taosArrayAddAll
(
SArray
*
pArray
,
const
SArray
*
pInput
)
{
if
(
pInput
)
{
return
taosArrayAddBatch
(
pArray
,
pInput
->
pData
,
(
int32_t
)
taosArrayGetSize
(
pInput
));
}
else
{
return
NULL
;
}
}
void
*
taosArrayPop
(
SArray
*
pArray
)
{
...
...
source/util/src/terror.c
浏览文件 @
b6bddd3f
...
...
@@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_MSGTYPE
,
"Invalid msg type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_LEADER
,
"Sync not leader"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_ONE_REPLICA
,
"Sync one replica"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
,
"Sync not in new config"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
// wal
...
...
tests/system-test/7-tmq/basic5.py
浏览文件 @
b6bddd3f
...
...
@@ -134,7 +134,7 @@ class TDTestCase:
parameterDict
[
'cfg'
]
=
cfgPath
prepareEnvThread
=
threading
.
Thread
(
target
=
self
.
prepareEnv
,
kwargs
=
parameterDict
)
prepareEnvThread
.
start
()
time
.
sleep
(
2
)
prepareEnvThread
.
join
(
)
# wait stb ready
while
1
:
...
...
@@ -245,6 +245,7 @@ class TDTestCase:
prepareEnvThread
=
threading
.
Thread
(
target
=
self
.
prepareEnv
,
kwargs
=
parameterDict
)
prepareEnvThread
.
start
()
prepareEnvThread
.
join
()
# wait db ready
while
1
:
...
...
@@ -371,6 +372,7 @@ class TDTestCase:
prepareEnvThread
=
threading
.
Thread
(
target
=
self
.
prepareEnv
,
kwargs
=
parameterDict
)
prepareEnvThread
.
start
()
prepareEnvThread
.
join
()
# wait db ready
while
1
:
...
...
tests/system-test/test-all.bat
浏览文件 @
b6bddd3f
...
...
@@ -61,7 +61,8 @@ goto :eof
set
tt
=
%
1
set
tt
=
%tt
:.
=
%
set
tt
=
%tt
::
=
%
set
index
=
1
set
tt
=
%tt
:
0
=
%
set
/a
index
=
1
for
%%a
in
(
%tt%
)
do
(
if
!index!
EQU
1
(
set
/a
hh
=
%%a
...
...
@@ -75,5 +76,5 @@ for %%a in (%tt%) do (
)
set
/a
index
=
index
+
1
)
set
/a
_timeTemp
=(
%hh%
*
60
+
%mm%
)*
60
+
%ss%
||
echo
hh
:
%hh%
mm
:
%mm%
ss
:
%ss%
set
/a
_timeTemp
=(
%hh%
*
60
+
%mm%
)*
60
+
%ss%
goto
:eof
\ No newline at end of file
tests/system-test/test.py
浏览文件 @
b6bddd3f
...
...
@@ -21,6 +21,7 @@ import base64
import
json
import
platform
import
socket
import
threading
from
distutils.log
import
warn
as
printf
from
fabric2
import
Connection
sys
.
path
.
append
(
"../pytest"
)
...
...
@@ -30,6 +31,13 @@ from util.cases import *
import
taos
def
checkRunTimeError
():
import
win32gui
while
1
:
time
.
sleep
(
1
)
hwnd
=
win32gui
.
FindWindow
(
None
,
"Microsoft Visual C++ Runtime Library"
)
if
hwnd
:
os
.
system
(
"TASKKILL /F /IM taosd.exe"
)
if
__name__
==
"__main__"
:
...
...
@@ -42,9 +50,6 @@ if __name__ == "__main__":
logSql
=
True
stop
=
0
restart
=
False
windows
=
0
if
platform
.
system
().
lower
()
==
'windows'
:
windows
=
1
updateCfgDict
=
{}
execCmd
=
""
opts
,
args
=
getopt
.
gnu_getopt
(
sys
.
argv
[
1
:],
'f:p:m:l:scghrd:k:e:'
,
[
...
...
@@ -159,7 +164,9 @@ if __name__ == "__main__":
host
=
masterIp
tdLog
.
info
(
"Procedures for tdengine deployed in %s"
%
(
host
))
if
windows
:
if
platform
.
system
().
lower
()
==
'windows'
:
if
(
masterIp
==
""
and
not
fileName
[
0
:
12
]
==
"0-others
\\
udf"
):
threading
.
Thread
(
target
=
checkRunTimeError
,
daemon
=
True
).
start
()
tdCases
.
logSql
(
logSql
)
tdLog
.
info
(
"Procedures for testing self-deployment"
)
tdDnodes
.
init
(
deployPath
,
masterIp
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录