Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
60bb2c18
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
60bb2c18
编写于
11月 01, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/stream
上级
94e7c79c
d53ab62f
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
224 addition
and
475 deletion
+224
-475
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+0
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-10
include/os/osDef.h
include/os/osDef.h
+10
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+1
-19
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-17
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+5
-115
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+0
-1
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+17
-11
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+4
-130
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+3
-18
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+6
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+21
-41
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+50
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-1
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+1
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+88
-102
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/vnode/replica3_repeat.sim
tests/script/tsim/vnode/replica3_repeat.sim
+10
-1
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
60bb2c18
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
719fc88
GIT_TAG
16eb34f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/dnode/mnode/mnode.h
浏览文件 @
60bb2c18
...
...
@@ -99,7 +99,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/
int32_t
mndProcessRpcMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncCtrlMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndPreProcessQueryMsg
(
SRpcMsg
*
pMsg
);
void
mndPostProcessQueryMsg
(
SRpcMsg
*
pMsg
);
...
...
include/libs/sync/sync.h
浏览文件 @
60bb2c18
...
...
@@ -220,21 +220,13 @@ const char* syncStr(ESyncState state);
bool
syncIsRestoreFinish
(
int64_t
rid
);
int32_t
syncGetSnapshotByIndex
(
int64_t
rid
,
SyncIndex
index
,
SSnapshot
*
pSnapshot
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
);
// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pCfg
);
int32_t
syncLeaderTransfer
(
int64_t
rid
);
int32_t
syncLeaderTransferTo
(
int64_t
rid
,
SNodeInfo
newLeader
);
int32_t
syncBeginSnapshot
(
int64_t
rid
,
int64_t
lastApplyIndex
);
int32_t
syncEndSnapshot
(
int64_t
rid
);
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
);
SSyncNode
*
syncNodeAcquire
(
int64_t
rid
);
void
syncNodeRelease
(
SSyncNode
*
pNode
);
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
include/os/osDef.h
浏览文件 @
60bb2c18
...
...
@@ -244,6 +244,16 @@ void syslog(int unused, const char *format, ...);
#define TD_CHARSET_LEN 64
#define TD_TIMEZONE_LEN 96
#ifdef WINDOWS
#define TD_PATH_MAX 260
#elif defined(PATH_MAX)
#define TD_PATH_MAX PATH_MAX
#elif defined(_XOPEN_PATH_MAX)
#define TD_PATH_MAX _XOPEN_PATH_MAX
#else
#define TD_PATH_MAX _POSIX_PATH_MAX
#endif
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
60bb2c18
...
...
@@ -67,24 +67,6 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncCtrlMsg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"msg:%p, get from mnode-sync-ctrl queue"
,
pMsg
);
SMsgHead
*
pHead
=
pMsg
->
pCont
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
vgId
=
ntohl
(
pHead
->
vgId
);
int32_t
code
=
mndProcessSyncCtrlMsg
(
pMsg
);
dGTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncMsg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
...
...
@@ -252,7 +234,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync-ctrl"
,
.
fp
=
(
FItem
)
mmProcessSync
Ctrl
Msg
,
.
fp
=
(
FItem
)
mmProcessSyncMsg
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncCtrlWorker
,
&
scCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
60bb2c18
...
...
@@ -133,22 +133,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static
void
vmProcessSyncCtrlQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-sync queue"
,
pVnode
->
vgId
,
pMsg
);
int32_t
code
=
vnodeProcessSyncCtrlMsg
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
static
int32_t
vmPutMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
,
EQueueType
qtype
)
{
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
SMsgHead
*
pHead
=
pMsg
->
pCont
;
...
...
@@ -317,7 +301,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t
vmAllocQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeProposeWriteMsg
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pSyncCtrlQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncCtrlPool
,
pVnode
,
(
FItems
)
vmProcessSync
Ctrl
Queue
);
pVnode
->
pSyncCtrlQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncCtrlPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeApplyWriteMsg
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pStreamQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
60bb2c18
...
...
@@ -474,128 +474,18 @@ void mndStop(SMnode *pMnode) {
mndCleanupTimer
(
pMnode
);
}
int32_t
mndProcessSyncCtrlMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
0
;
mInfo
(
"vgId:%d, process sync ctrl msg"
,
1
);
if
(
!
syncIsInit
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
0
;
if
(
!
syncIsInit
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
-
1
;
}
syncNodeRelease
(
pSyncNode
);
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
mGTrace
(
"vgId:1, sync msg:%p will be processed, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
int32_t
code
=
syncProcessMsg
(
pMgmt
->
sync
,
pMsg
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
mGError
(
"vgId:1, failed to process sync msg:%p type:%s since %s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
())
;
}
return
code
;
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
60bb2c18
...
...
@@ -83,7 +83,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t
vnodeProcessWriteMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessSyncMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSyncCtrlMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
void
vnodeProposeWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
...
...
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
60bb2c18
...
...
@@ -48,7 +48,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
}
pTsdb
->
path
=
(
char
*
)
&
pTsdb
[
1
];
s
printf
(
pTsdb
->
path
,
"%s%s%s"
,
pVnode
->
path
,
TD_DIRSEP
,
dir
);
s
nprintf
(
pTsdb
->
path
,
TD_PATH_MAX
,
"%s%s%s"
,
pVnode
->
path
,
TD_DIRSEP
,
dir
);
taosRealPath
(
pTsdb
->
path
,
NULL
,
slen
);
pTsdb
->
pVnode
=
pVnode
;
taosThreadRwlockInit
(
&
pTsdb
->
rwLock
,
NULL
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
60bb2c18
...
...
@@ -78,6 +78,7 @@ typedef struct SIOCostSummary {
double
lastBlockLoadTime
;
int64_t
composedBlocks
;
double
buildComposedBlockTime
;
double
createScanInfoList
;
}
SIOCostSummary
;
typedef
struct
SBlockLoadSuppInfo
{
...
...
@@ -240,6 +241,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return
NULL
;
}
int64_t
st
=
taosGetTimestampUs
();
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
].
uid
};
if
(
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
))
{
...
...
@@ -255,8 +258,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
pTsdbReader
->
idStr
);
}
tsdbDebug
(
"%p create %d tables scan-info, size:%.2f Kb, %s"
,
pTsdbReader
,
numOfTables
,
(
sizeof
(
STableBlockScanInfo
)
*
numOfTables
)
/
1024
.
0
,
pTsdbReader
->
idStr
);
pTsdbReader
->
cost
.
createScanInfoList
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
tsdbDebug
(
"%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s"
,
pTsdbReader
,
numOfTables
,
(
sizeof
(
STableBlockScanInfo
)
*
numOfTables
)
/
1024
.
0
,
pTsdbReader
->
cost
.
createScanInfoList
,
pTsdbReader
->
idStr
);
return
pTableMap
;
}
...
...
@@ -3698,15 +3703,16 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree
(
pLReader
);
}
tsdbDebug
(
"%p :io-cost summary: head-file:%"
PRIu64
", head-file time:%.2f ms, SMA:%"
PRId64
" SMA-time:%.2f ms, fileBlocks:%"
PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%"
PRId64
", lastBlocks-time:%.2f ms, composed-blocks:%"
PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s"
,
pReader
,
pCost
->
headFileLoad
,
pCost
->
headFileLoadTime
,
pCost
->
smaDataLoad
,
pCost
->
smaLoadTime
,
pCost
->
numOfBlocks
,
pCost
->
blockLoadTime
,
pCost
->
buildmemBlock
,
pCost
->
lastBlockLoad
,
pCost
->
lastBlockLoadTime
,
pCost
->
composedBlocks
,
pCost
->
buildComposedBlockTime
,
numOfTables
*
sizeof
(
STableBlockScanInfo
)
/
1000
.
0
,
pReader
->
idStr
);
tsdbDebug
(
"%p :io-cost summary: head-file:%"
PRIu64
", head-file time:%.2f ms, SMA:%"
PRId64
" SMA-time:%.2f ms, fileBlocks:%"
PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%"
PRId64
", lastBlocks-time:%.2f ms, composed-blocks:%"
PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s"
,
pReader
,
pCost
->
headFileLoad
,
pCost
->
headFileLoadTime
,
pCost
->
smaDataLoad
,
pCost
->
smaLoadTime
,
pCost
->
numOfBlocks
,
pCost
->
blockLoadTime
,
pCost
->
buildmemBlock
,
pCost
->
lastBlockLoad
,
pCost
->
lastBlockLoadTime
,
pCost
->
composedBlocks
,
pCost
->
buildComposedBlockTime
,
numOfTables
*
sizeof
(
STableBlockScanInfo
)
/
1000
.
0
,
pCost
->
createScanInfoList
,
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
idStr
);
taosMemoryFree
(
pReader
->
pSchema
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
60bb2c18
...
...
@@ -230,142 +230,16 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
int32_t
vnodeProcessSyncCtrlMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
code
=
0
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
!
syncIsInit
())
{
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
if
(
pSyncNode
==
NULL
)
{
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
vGTrace
(
"vgId:%d, sync msg:%p will be processed, type:%s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
}
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
int32_t
vnodeProcessSyncMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
code
=
0
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
!
syncIsInit
())
{
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
if
(
pSyncNode
==
NULL
)
{
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
vGTrace
(
"vgId:%d, sync msg:%p will be processed, type:%s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
int32_t
code
=
syncProcessMsg
(
pVnode
->
sync
,
pMsg
);
if
(
code
!=
0
)
{
vGError
(
"vgId:%d, failed to process sync msg:%p type:%s since %s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
());
}
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
60bb2c18
...
...
@@ -96,11 +96,12 @@ typedef struct SColMatchInfo {
int32_t
matchType
;
// determinate the source according to col id or slot id
}
SColMatchInfo
;
typedef
struct
SExecTaskInfo
SExecTaskInfo
;
typedef
struct
STableListInfo
STableListInfo
;
struct
SqlFunctionCtx
;
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
id
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
SExecTaskInfo
*
pTaskInfo
);
STableListInfo
*
tableListCreate
();
void
*
tableListDestroy
(
STableListInfo
*
pTableListInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
60bb2c18
...
...
@@ -89,25 +89,10 @@ typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
uint64_t
loadStatisTime
;
uint64_t
loadFileBlockTime
;
uint64_t
loadDataInCacheTime
;
uint64_t
loadStatisSize
;
uint64_t
loadFileBlockSize
;
uint64_t
loadDataInCacheSize
;
uint64_t
loadDataTime
;
uint64_t
elapsedTime
;
double
extractListTime
;
double
groupIdMapTime
;
SFileBlockLoadRecorder
*
pRecoder
;
uint64_t
elapsedTime
;
uint64_t
winInfoSize
;
uint64_t
tableInfoSize
;
uint64_t
hashSize
;
uint64_t
numOfTimeWindows
;
SArray
*
queryProfEvents
;
// SArray<SQueryProfEvent>
SHashObj
*
operatorProfResults
;
// map<operator_type, SQueryProfEvent>
}
STaskCostInfo
;
typedef
struct
SOperatorCostInfo
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
60bb2c18
...
...
@@ -1888,8 +1888,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
struct
SExecTaskInfo
*
pTaskInfo
)
{
int64_t
st
=
taosGetTimestampUs
();
const
char
*
idStr
=
GET_TASKID
(
pTaskInfo
);
if
(
pHandle
==
NULL
)
{
qError
(
"invalid handle, in creating operator tree, %s"
,
idStr
);
...
...
@@ -1905,7 +1906,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
ASSERT
(
pTableListInfo
->
numOfOuputGroups
==
1
);
int64_t
st1
=
taosGetTimestampUs
();
qDebug
(
"generate queried table list completed, elapsed time:%.2f ms %s"
,
(
st1
-
st
)
/
1000
.
0
,
idStr
);
pTaskInfo
->
cost
.
extractListTime
=
(
st1
-
st
)
/
1000
.
0
;
qDebug
(
"extract queried table list completed, elapsed time:%.2f ms %s"
,
pTaskInfo
->
cost
.
extractListTime
,
idStr
);
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query, %s"
PRIx64
,
idStr
);
...
...
@@ -1917,8 +1919,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
code
;
}
int64_t
st2
=
taosGetTimestampUs
()
;
qDebug
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
(
st2
-
st1
)
/
1000
.
0
,
idStr
);
pTaskInfo
->
cost
.
groupIdMapTime
=
(
taosGetTimestampUs
()
-
st1
)
/
1000
.
0
;
qDebug
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
pTaskInfo
->
cost
.
groupIdMapTime
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/executorimpl.c
浏览文件 @
60bb2c18
...
...
@@ -1400,40 +1400,18 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
static
int32_t
compressQueryColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
int32_t
colSize
=
pColRes
->
info
.
bytes
*
numOfRows
;
return
(
*
(
tDataTypes
[
pColRes
->
info
.
type
].
compFunc
))(
pColRes
->
pData
,
colSize
,
numOfRows
,
data
,
colSize
+
COMP_OVERFLOW_BYTES
,
compressed
,
NULL
,
0
);
}
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
// uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize;
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
// } else {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
"rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
...
...
@@ -3220,6 +3198,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
schemaInfo
.
dbname
=
strdup
(
dbFName
);
...
...
@@ -3350,7 +3333,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
idstr
);
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
qError
(
"failed to createScanTableListInfo, code:%s, %s"
,
tstrerror
(
code
),
idstr
);
...
...
@@ -3368,9 +3351,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
/*pTableScanNode->groupSort*/
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
idstr
);
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
qError
(
"failed to createScanTableListInfo, code: %s"
,
tstrerror
(
code
));
...
...
@@ -3395,7 +3378,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if
(
pHandle
->
vnode
)
{
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
idstr
);
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
qError
(
"failed to createScanTableListInfo, code: %s"
,
tstrerror
(
code
));
...
...
@@ -3422,7 +3405,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
pScanPhyNode
,
NULL
,
false
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
idstr
);
pTagIndexCond
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
qError
(
"failed to getTableList, code: %s"
,
tstrerror
(
code
));
...
...
@@ -3455,7 +3438,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLastRowScanPhysiNode
*
pScanNode
=
(
SLastRowScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
idstr
);
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -3787,10 +3770,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
*
pTaskInfo
=
createExecTaskInfo
(
queryId
,
taskId
,
model
,
pPlan
->
dbFName
);
if
(
*
pTaskInfo
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
}
...
...
@@ -3809,17 +3790,16 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
code
=
(
*
pTaskInfo
)
->
code
;
terrno
=
(
*
pTaskInfo
)
->
code
;
goto
_complete
;
}
return
code
;
return
TSDB_CODE_SUCCESS
;
_complete:
_complete:
taosMemoryFree
(
sql
);
doDestroyTask
(
*
pTaskInfo
);
terrno
=
code
;
return
code
;
return
terrno
;
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
60bb2c18
...
...
@@ -2194,6 +2194,56 @@ int32_t leastSQRFunction(SqlFunctionCtx* pCtx) {
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
{
uint8_t
*
plist
=
(
uint8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
++
;
LEASTSQR_CAL
(
param
,
x
,
plist
,
i
,
pInfo
->
stepVal
);
}
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
{
uint16_t
*
plist
=
(
uint16_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
++
;
LEASTSQR_CAL
(
param
,
x
,
plist
,
i
,
pInfo
->
stepVal
);
}
break
;
}
case
TSDB_DATA_TYPE_UINT
:
{
uint32_t
*
plist
=
(
uint32_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
++
;
LEASTSQR_CAL
(
param
,
x
,
plist
,
i
,
pInfo
->
stepVal
);
}
break
;
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
uint64_t
*
plist
=
(
uint64_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
++
;
LEASTSQR_CAL
(
param
,
x
,
plist
,
i
,
pInfo
->
stepVal
);
}
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
plist
=
(
float
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
60bb2c18
...
...
@@ -302,7 +302,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t
syncGetSnapshotMetaByIndex
(
int64_t
rid
,
SyncIndex
snapshotIndex
,
struct
SSnapshotMeta
*
sMeta
);
bool
syncNodeCanChange
(
SSyncNode
*
pSyncNode
);
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pNewCfg
);
int32_t
syncNodeLeaderTransfer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
);
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
60bb2c18
...
...
@@ -80,6 +80,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode
*
pNode
=
taosAcquireRef
(
gNodeRefId
,
rid
);
if
(
pNode
==
NULL
)
{
sTrace
(
"failed to acquire node from refId:%"
PRId64
,
rid
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
pNode
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
60bb2c18
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
...
...
@@ -34,8 +35,6 @@
#include "syncUtil.h"
#include "syncVoteMgr.h"
// ------ local funciton ---------
// enqueue message ----
static
void
syncNodeEqPingTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqElectTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
...
...
@@ -44,158 +43,145 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
static
void
syncNodeEqPeerHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
);
// process message ----
int32_t
syncNodeOnPing
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReply
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pNode
=
syncNodeOpen
(
pSyncInfo
);
if
(
pNode
==
NULL
)
{
SSyncNode
*
p
Sync
Node
=
syncNodeOpen
(
pSyncInfo
);
if
(
p
Sync
Node
==
NULL
)
{
sError
(
"vgId:%d, failed to open sync node"
,
pSyncInfo
->
vgId
);
return
-
1
;
}
p
Node
->
rid
=
syncNodeAdd
(
p
Node
);
if
(
pNode
->
rid
<
0
)
{
syncNodeClose
(
pNode
);
p
SyncNode
->
rid
=
syncNodeAdd
(
pSync
Node
);
if
(
p
Sync
Node
->
rid
<
0
)
{
syncNodeClose
(
p
Sync
Node
);
return
-
1
;
}
return
pNode
->
rid
;
return
p
Sync
Node
->
rid
;
}
void
syncStart
(
int64_t
rid
)
{
SSyncNode
*
pNode
=
syncNodeAcquire
(
rid
);
if
(
pNode
!=
NULL
)
{
syncNodeStart
(
pNode
);
syncNodeRelease
(
pNode
);
SSyncNode
*
p
Sync
Node
=
syncNodeAcquire
(
rid
);
if
(
p
Sync
Node
!=
NULL
)
{
syncNodeStart
(
p
Sync
Node
);
syncNodeRelease
(
p
Sync
Node
);
}
}
void
syncStop
(
int64_t
rid
)
{
SSyncNode
*
pNode
=
syncNodeAcquire
(
rid
);
if
(
pNode
!=
NULL
)
{
syncNodeRelease
(
pNode
);
syncNodeRemove
(
rid
);
}
}
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pNewCfg
)
{
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
if
(
!
IamInNew
)
{
return
false
;
}
if
(
pNewCfg
->
replicaNum
>
pSyncNode
->
replicaNum
+
1
)
{
return
false
;
}
if
(
pNewCfg
->
replicaNum
<
pSyncNode
->
replicaNum
-
1
)
{
return
false
;
}
return
true
;
}
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
if
(
pSyncNode
!=
NULL
)
{
syncNodeRelease
(
pSyncNode
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"invalid new config. vgId:%d"
,
pSyncNode
->
vgId
);
return
-
1
;
syncNodeRemove
(
rid
);
}
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
pRpcMsg
->
msgType
=
TDMT_SYNC_CONFIG_CHANGE
;
pRpcMsg
->
info
.
noResp
=
1
;
pRpcMsg
->
contLen
=
strlen
(
newconfig
)
+
1
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
snprintf
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
"%s"
,
newconfig
);
taosMemoryFree
(
newconfig
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
static
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pCfg
)
{
if
(
!
syncNodeInConfig
(
pSyncNode
,
pCfg
))
return
false
;
return
abs
(
pCfg
->
replicaNum
-
pSyncNode
->
replicaNum
)
<=
1
;
}
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
==
NULL
)
return
-
1
;
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
syncNodeRelease
(
pSyncNode
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"
invalid new config. vgId:%d
"
,
pSyncNode
->
vgId
);
sError
(
"
vgId:%d, failed to reconfig since invalid new config
"
,
pSyncNode
->
vgId
);
return
-
1
;
}
#if 0
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
int32_t ret = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(newconfig) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
taosMemoryFree(newconfig);
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
syncNodeRelease(pSyncNode);
return ret;
#else
syncNodeUpdateNewConfigIndex
(
pSyncNode
,
pNewCfg
);
syncNodeDoConfigChange
(
pSyncNode
,
pNewCfg
,
SYNC_INDEX_INVALID
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncNodeStopHeartbeatTimer
(
pSyncNode
);
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
syncHbTimerInit
(
pSyncNode
,
&
(
pSyncNode
->
peerHeartbeatTimerArr
[
i
]),
(
pSyncNode
->
replicasId
)
[
i
]);
syncHbTimerInit
(
pSyncNode
,
&
pSyncNode
->
peerHeartbeatTimerArr
[
i
],
pSyncNode
->
replicasId
[
i
]);
}
syncNodeStartHeartbeatTimer
(
pSyncNode
);
syncNodeReplicate
(
pSyncNode
);
}
syncNodeRelease
(
pSyncNode
);
return
0
;
#endif
}
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
if
(
!
syncIsInit
())
return
code
;
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
if
(
pSyncNode
==
NULL
)
return
code
;
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
sError
(
"vgId:%d, failed to process msg:%p since invalid type:%s"
,
pSyncNode
->
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
syncNodeLeaderTransfer
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
return
code
;
}
int32_t
syncLeaderTransfer
To
(
int64_t
rid
,
SNodeInfo
newLeader
)
{
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
==
NULL
)
return
-
1
;
int32_t
ret
=
syncNodeLeaderTransfer
To
(
pSyncNode
,
newLeader
);
int32_t
ret
=
syncNodeLeaderTransfer
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
}
...
...
@@ -3676,4 +3662,4 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-local-cmd {cmd:%d-%s, sd-new-term:%"
PRIu64
"}, %s"
,
pMsg
->
cmd
,
syncLocalCmdGetStr
(
pMsg
->
cmd
),
pMsg
->
sdNewTerm
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
\ No newline at end of file
}
tests/script/jenkins/basic.txt
浏览文件 @
60bb2c18
...
...
@@ -322,7 +322,7 @@
# --- vnode ----
./test.sh -f tsim/vnode/replica3_basic.sim
# TD-20089
./test.sh -f tsim/vnode/replica3_repeat.sim
./test.sh -f tsim/vnode/replica3_repeat.sim
./test.sh -f tsim/vnode/replica3_vgroup.sim
./test.sh -f tsim/vnode/replica3_many.sim
./test.sh -f tsim/vnode/replica3_import.sim
...
...
tests/script/tsim/vnode/replica3_repeat.sim
浏览文件 @
60bb2c18
...
...
@@ -107,7 +107,16 @@ system sh/exec.sh -n dnode2 -s stop
sleep 3000
print ======== step7
sql select count(*) from db.tb
$x = 0
step7:
$x = $x + 1
sleep 1000
if $x == 30 then
print ====> dnode not ready!
return -1
endi
sql select count(*) from db.tb -x step7
print select count(*) from db.tb ==> $data00 $lastRows
if $data00 <= $lastRows then
return -1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录