Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d5aea7da
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d5aea7da
编写于
8月 02, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: adjust sync log
上级
895e0c55
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
114 addition
and
73 deletion
+114
-73
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+1
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+2
-2
source/dnode/vnode/src/meta/metaSma.c
source/dnode/vnode/src/meta/metaSma.c
+2
-2
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+4
-4
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-2
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+4
-4
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+1
-1
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+9
-10
source/libs/index/src/index.c
source/libs/index/src/index.c
+2
-2
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+4
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+19
-21
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+4
-4
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+6
-9
tests/script/tmp/r1.sim
tests/script/tmp/r1.sim
+47
-0
未找到文件。
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
d5aea7da
...
@@ -146,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
...
@@ -146,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
if
(
pVnode
==
NULL
)
{
dGError
(
"vgId:%d, msg:%p failed to put into vnode queue since %s,
msg
type:%s qtype:%d"
,
pHead
->
vgId
,
pMsg
,
dGError
(
"vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d"
,
pHead
->
vgId
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
),
qtype
);
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
),
qtype
);
return
terrno
!=
0
?
terrno
:
-
1
;
return
terrno
!=
0
?
terrno
:
-
1
;
}
}
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
d5aea7da
...
@@ -133,7 +133,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
...
@@ -133,7 +133,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
ret
=
tdbTbOpen
(
"stream.task.db"
,
sizeof
(
int64_t
),
-
1
,
taskIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pStreamDb
);
ret
=
tdbTbOpen
(
"stream.task.db"
,
sizeof
(
int64_t
),
-
1
,
taskIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pStreamDb
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
metaError
(
"vgId
: %
d, failed to open meta stream task index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
metaError
(
"vgIdd, failed to open meta stream task index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
goto
_err
;
}
}
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
d5aea7da
...
@@ -599,7 +599,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
...
@@ -599,7 +599,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
for
(
int
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
smaId
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pSmaIds
,
i
);
smaId
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pSmaIds
,
i
);
if
(
metaGetTableEntryByUid
(
&
mr
,
smaId
)
<
0
)
{
if
(
metaGetTableEntryByUid
(
&
mr
,
smaId
)
<
0
)
{
metaWarn
(
"vgId:%d, no entry for tbId
: %"
PRIi64
", smaId: %
"
PRIi64
,
TD_VID
(
pMeta
->
pVnode
),
uid
,
smaId
);
metaWarn
(
"vgId:%d, no entry for tbId
"
PRIi64
", smaId
"
PRIi64
,
TD_VID
(
pMeta
->
pVnode
),
uid
,
smaId
);
continue
;
continue
;
}
}
pTSma
=
pSW
->
tSma
+
smaIdx
;
pTSma
=
pSW
->
tSma
+
smaIdx
;
...
@@ -647,7 +647,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
...
@@ -647,7 +647,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
SMetaReader
mr
=
{
0
};
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pMeta
,
0
);
metaReaderInit
(
&
mr
,
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
indexUid
)
<
0
)
{
if
(
metaGetTableEntryByUid
(
&
mr
,
indexUid
)
<
0
)
{
metaWarn
(
"vgId:%d, failed to get table entry for smaId
: %
"
PRIi64
,
TD_VID
(
pMeta
->
pVnode
),
indexUid
);
metaWarn
(
"vgId:%d, failed to get table entry for smaId"
PRIi64
,
TD_VID
(
pMeta
->
pVnode
),
indexUid
);
metaReaderClear
(
&
mr
);
metaReaderClear
(
&
mr
);
return
NULL
;
return
NULL
;
}
}
...
...
source/dnode/vnode/src/meta/metaSma.c
浏览文件 @
d5aea7da
...
@@ -57,12 +57,12 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
...
@@ -57,12 +57,12 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
if
(
metaHandleSmaEntry
(
pMeta
,
&
me
)
<
0
)
goto
_err
;
if
(
metaHandleSmaEntry
(
pMeta
,
&
me
)
<
0
)
goto
_err
;
metaDebug
(
"vgId:%d, tsma is created, name:%s uid:
%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pCfg
->
indexName
,
pCfg
->
indexUid
);
metaDebug
(
"vgId:%d, tsma is created, name:%s uid:%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pCfg
->
indexName
,
pCfg
->
indexUid
);
return
0
;
return
0
;
_err:
_err:
metaError
(
"vgId:%d, failed to create tsma
: %s uid:
%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pCfg
->
indexName
,
metaError
(
"vgId:%d, failed to create tsma
s uid:
%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pCfg
->
indexName
,
pCfg
->
indexUid
,
tstrerror
(
terrno
));
pCfg
->
indexUid
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
d5aea7da
...
@@ -204,12 +204,12 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
...
@@ -204,12 +204,12 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
++
pMeta
->
pVnode
->
config
.
vndStats
.
numOfSTables
;
++
pMeta
->
pVnode
->
config
.
vndStats
.
numOfSTables
;
metaDebug
(
"vgId:%d, super table is created, name:%s uid:
%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
metaDebug
(
"vgId:%d, super table is created, name:%s uid:%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
);
return
0
;
return
0
;
_err:
_err:
metaError
(
"vgId:%d, failed to create super table
: %s uid:
%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
metaError
(
"vgId:%d, failed to create super table
s uid:
%"
PRId64
" since %s"
,
TD_VID
(
pMeta
->
pVnode
),
pReq
->
name
,
pReq
->
suid
,
tstrerror
(
terrno
));
pReq
->
suid
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
...
@@ -996,7 +996,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -996,7 +996,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tbDbKey
.
version
=
pME
->
version
;
tbDbKey
.
version
=
pME
->
version
;
tbDbKey
.
uid
=
pME
->
uid
;
tbDbKey
.
uid
=
pME
->
uid
;
metaDebug
(
"vgId:%d, start to save table version:%"
PRId64
"
uid:
%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pME
->
version
,
metaDebug
(
"vgId:%d, start to save table version:%"
PRId64
"
uid:
%"
PRId64
,
TD_VID
(
pMeta
->
pVnode
),
pME
->
version
,
pME
->
uid
);
pME
->
uid
);
pKey
=
&
tbDbKey
;
pKey
=
&
tbDbKey
;
...
@@ -1031,7 +1031,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
...
@@ -1031,7 +1031,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
return
0
;
return
0
;
_err:
_err:
metaError
(
"vgId:%d, failed to save table version:%"
PRId64
"uid:
%"
PRId64
" %s"
,
TD_VID
(
pMeta
->
pVnode
),
pME
->
version
,
metaError
(
"vgId:%d, failed to save table version:%"
PRId64
"uid:%"
PRId64
" %s"
,
TD_VID
(
pMeta
->
pVnode
),
pME
->
version
,
pME
->
uid
,
tstrerror
(
terrno
));
pME
->
uid
,
tstrerror
(
terrno
));
taosMemoryFree
(
pVal
);
taosMemoryFree
(
pVal
);
...
...
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
d5aea7da
...
@@ -71,7 +71,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
...
@@ -71,7 +71,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto
_err
;
goto
_err
;
}
}
tsdbDebug
(
"vgId:%d, tsdb is opened
for
%s, days:%d, keep:%d,%d,%d"
,
TD_VID
(
pVnode
),
pTsdb
->
path
,
pTsdb
->
keepCfg
.
days
,
tsdbDebug
(
"vgId:%d, tsdb is opened
at
%s, days:%d, keep:%d,%d,%d"
,
TD_VID
(
pVnode
),
pTsdb
->
path
,
pTsdb
->
keepCfg
.
days
,
pTsdb
->
keepCfg
.
keep0
,
pTsdb
->
keepCfg
.
keep1
,
pTsdb
->
keepCfg
.
keep2
);
pTsdb
->
keepCfg
.
keep0
,
pTsdb
->
keepCfg
.
keep1
,
pTsdb
->
keepCfg
.
keep2
);
*
ppTsdb
=
pTsdb
;
*
ppTsdb
=
pTsdb
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d5aea7da
...
@@ -1197,7 +1197,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
...
@@ -1197,7 +1197,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag
(
pReader
,
true
);
setComposedBlockFlag
(
pReader
,
true
);
double
elapsedTime
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
double
elapsedTime
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
tsdbDebug
(
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange
: %
"
PRId64
tsdbDebug
(
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange"
PRId64
" - %"
PRId64
" %s"
,
" - %"
PRId64
" %s"
,
pReader
,
elapsedTime
,
pBlock
->
info
.
rows
,
pBlock
->
info
.
window
.
skey
,
pBlock
->
info
.
window
.
ekey
,
pReader
,
elapsedTime
,
pBlock
->
info
.
rows
,
pBlock
->
info
.
window
.
skey
,
pBlock
->
info
.
window
.
ekey
,
pReader
->
idStr
);
pReader
->
idStr
);
...
@@ -2647,7 +2647,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
...
@@ -2647,7 +2647,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"failed to create data reader, code
: %
s %s"
,
tstrerror
(
code
),
pReader
->
idStr
);
tsdbError
(
"failed to create data reader, codes %s"
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
return
code
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
d5aea7da
...
@@ -40,7 +40,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
...
@@ -40,7 +40,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
pVnode
->
pPool
=
pPool
;
pVnode
->
pPool
=
pPool
;
}
}
vDebug
(
"vgId:%d, vnode buffer pool is opened,
pool size:
%"
PRId64
,
TD_VID
(
pVnode
),
size
);
vDebug
(
"vgId:%d, vnode buffer pool is opened,
size:
%"
PRId64
,
TD_VID
(
pVnode
),
size
);
return
0
;
return
0
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
d5aea7da
...
@@ -95,19 +95,19 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
...
@@ -95,19 +95,19 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// save info to a vnode_tmp.json
// save info to a vnode_tmp.json
pFile
=
taosOpenFile
(
fname
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
pFile
=
taosOpenFile
(
fname
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
if
(
pFile
==
NULL
)
{
vError
(
"failed to open info file:
%s for write:
%s"
,
fname
,
terrstr
());
vError
(
"failed to open info file:
%s for write:
%s"
,
fname
,
terrstr
());
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
}
}
if
(
taosWriteFile
(
pFile
,
data
,
strlen
(
data
))
<
0
)
{
if
(
taosWriteFile
(
pFile
,
data
,
strlen
(
data
))
<
0
)
{
vError
(
"failed to write info file:
%s data:
%s"
,
fname
,
terrstr
());
vError
(
"failed to write info file:
%s data:
%s"
,
fname
,
terrstr
());
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
if
(
taosFsyncFile
(
pFile
)
<
0
)
{
if
(
taosFsyncFile
(
pFile
)
<
0
)
{
vError
(
"failed to fsync info file:
%s error:
%s"
,
fname
,
terrstr
());
vError
(
"failed to fsync info file:
%s error:
%s"
,
fname
,
terrstr
());
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
...
@@ -117,7 +117,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
...
@@ -117,7 +117,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary
// free info binary
taosMemoryFree
(
data
);
taosMemoryFree
(
data
);
vInfo
(
"vgId:%d, vnode info is saved, fname:
%s"
,
pInfo
->
config
.
vgId
,
fname
);
vInfo
(
"vgId:%d, vnode info is saved, fname:%s"
,
pInfo
->
config
.
vgId
,
fname
);
return
0
;
return
0
;
...
...
source/dnode/vnode/src/vnd/vnodeModule.c
浏览文件 @
d5aea7da
...
@@ -55,7 +55,7 @@ int vnodeInit(int nthreads) {
...
@@ -55,7 +55,7 @@ int vnodeInit(int nthreads) {
vnodeGlobal
.
threads
=
taosMemoryCalloc
(
nthreads
,
sizeof
(
TdThread
));
vnodeGlobal
.
threads
=
taosMemoryCalloc
(
nthreads
,
sizeof
(
TdThread
));
if
(
vnodeGlobal
.
threads
==
NULL
)
{
if
(
vnodeGlobal
.
threads
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
vError
(
"failed to init vnode module since:
%s"
,
tstrerror
(
terrno
));
vError
(
"failed to init vnode module since:%s"
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
d5aea7da
...
@@ -23,13 +23,13 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
...
@@ -23,13 +23,13 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
// check config
// check config
if
(
vnodeCheckCfg
(
pCfg
)
<
0
)
{
if
(
vnodeCheckCfg
(
pCfg
)
<
0
)
{
vError
(
"vgId:%d, failed to create vnode since:
%s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
vError
(
"vgId:%d, failed to create vnode since:%s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
// create vnode env
// create vnode env
if
(
tfsMkdirAt
(
pTfs
,
path
,
(
SDiskID
){
0
})
<
0
)
{
if
(
tfsMkdirAt
(
pTfs
,
path
,
(
SDiskID
){
0
})
<
0
)
{
vError
(
"vgId:%d, failed to create vnode since:
%s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
vError
(
"vgId:%d, failed to create vnode since:%s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d5aea7da
...
@@ -262,7 +262,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
...
@@ -262,7 +262,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
return
0
;
return
0
;
_err:
_err:
vError
(
"vgId:%d, process %s request failed since %s, version
: %
"
PRId64
,
TD_VID
(
pVnode
),
TMSG_INFO
(
pMsg
->
msgType
),
vError
(
"vgId:%d, process %s request failed since %s, version"
PRId64
,
TD_VID
(
pVnode
),
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
terrno
),
version
);
tstrerror
(
terrno
),
version
);
return
-
1
;
return
-
1
;
}
}
...
@@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
...
@@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
}
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"
message in fetch queue is processing"
);
vTrace
(
"
vgId:%d, msg:%p in fetch queue is processing"
,
pVnode
->
config
.
vgId
,
pMsg
);
if
((
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_VND_TABLE_META
||
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
||
if
((
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_VND_TABLE_META
||
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
||
pMsg
->
msgType
==
TDMT_VND_BATCH_META
)
&&
pMsg
->
msgType
==
TDMT_VND_BATCH_META
)
&&
!
vnodeIsLeader
(
pVnode
))
{
!
vnodeIsLeader
(
pVnode
))
{
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
d5aea7da
...
@@ -518,15 +518,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
...
@@ -518,15 +518,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
rpcMsg
.
info
.
conn
.
applyTerm
=
cbMeta
.
term
;
rpcMsg
.
info
.
conn
.
applyTerm
=
cbMeta
.
term
;
vInfo
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
vInfo
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
",
isWeak:%d, code:%d, state:%d %s, msgtype:%d
%s"
,
",
weak:%d, code:%d, state:%d %s, type:
%s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
cbMeta
.
isWeak
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
}
else
{
SRpcMsg
rsp
=
{.
code
=
cbMeta
.
code
,
.
info
=
pMsg
->
info
};
SRpcMsg
rsp
=
{.
code
=
cbMeta
.
code
,
.
info
=
pMsg
->
info
};
vError
(
"vgId:%d,
sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:
%s"
,
syncGetVgId
(
pVnode
->
sync
),
vError
(
"vgId:%d,
commit-cb execute error, type:%s, index:%"
PRId64
", error:0x%x
%s"
,
syncGetVgId
(
pVnode
->
sync
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
index
,
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
index
,
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
if
(
rsp
.
info
.
handle
!=
NULL
)
{
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
}
...
@@ -537,10 +537,9 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
...
@@ -537,10 +537,9 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
static
void
vnodeSyncPreCommitMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
static
void
vnodeSyncPreCommitMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
if
(
cbMeta
.
isWeak
==
1
)
{
if
(
cbMeta
.
isWeak
==
1
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%"
PRId64
vTrace
(
"vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
syncUtilState2String
(
cbMeta
.
state
),
TMSG_INFO
(
pMsg
->
msgType
));
if
(
cbMeta
.
code
==
0
)
{
if
(
cbMeta
.
code
==
0
)
{
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
...
@@ -552,8 +551,8 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
...
@@ -552,8 +551,8 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
}
else
{
SRpcMsg
rsp
=
{.
code
=
cbMeta
.
code
,
.
info
=
pMsg
->
info
};
SRpcMsg
rsp
=
{.
code
=
cbMeta
.
code
,
.
info
=
pMsg
->
info
};
vError
(
"vgId:%d,
sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:
%s"
,
syncGetVgId
(
pVnode
->
sync
),
vError
(
"vgId:%d,
pre-commit-cb execute error, type:%s, error:0x%x
%s"
,
syncGetVgId
(
pVnode
->
sync
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
TMSG_INFO
(
pMsg
->
msgType
),
cbMeta
.
code
,
tstrerror
(
cbMeta
.
code
));
if
(
rsp
.
info
.
handle
!=
NULL
)
{
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
}
...
@@ -563,7 +562,7 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
...
@@ -563,7 +562,7 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
static
void
vnodeSyncRollBackMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
static
void
vnodeSyncRollBackMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, rollback-cb is excuted, fsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s, msg
t
ype:%d %s"
,
vTrace
(
"vgId:%d, rollback-cb is excuted, fsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s, msg
T
ype:%d %s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
syncUtilState2String
(
cbMeta
.
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
));
}
}
...
...
source/libs/index/src/index.c
浏览文件 @
d5aea7da
...
@@ -220,7 +220,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
...
@@ -220,7 +220,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
),
.
colType
=
p
->
colType
};
ICacheKey
key
=
{.
suid
=
p
->
suid
,
.
colName
=
p
->
colName
,
.
nColName
=
strlen
(
p
->
colName
),
.
colType
=
p
->
colType
};
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
indexDebug
(
"w suid:
%"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
indexDebug
(
"w suid:%"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
assert
(
*
cache
!=
NULL
);
...
@@ -395,7 +395,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
...
@@ -395,7 +395,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
ICacheKey
key
=
{
ICacheKey
key
=
{
.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
),
.
colType
=
term
->
colType
};
.
suid
=
term
->
suid
,
.
colName
=
term
->
colName
,
.
nColName
=
strlen
(
term
->
colName
),
.
colType
=
term
->
colType
};
indexDebug
(
"r suid:
%"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
indexDebug
(
"r suid:%"
PRIu64
", colName: %s, colType: %d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
taosThreadMutexLock
(
&
sIdx
->
mtx
);
taosThreadMutexLock
(
&
sIdx
->
mtx
);
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
d5aea7da
...
@@ -141,7 +141,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
...
@@ -141,7 +141,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader
**
reader
=
taosHashIterate
(
tcache
->
tableCache
,
NULL
);
TFileReader
**
reader
=
taosHashIterate
(
tcache
->
tableCache
,
NULL
);
while
(
reader
)
{
while
(
reader
)
{
TFileReader
*
p
=
*
reader
;
TFileReader
*
p
=
*
reader
;
indexInfo
(
"drop table cache suid:
%"
PRIu64
", colName: %s, colType: %d"
,
p
->
header
.
suid
,
p
->
header
.
colName
,
indexInfo
(
"drop table cache suid:%"
PRIu64
", colName: %s, colType: %d"
,
p
->
header
.
suid
,
p
->
header
.
colName
,
p
->
header
.
colType
);
p
->
header
.
colType
);
tfileReaderUnRef
(
p
);
tfileReaderUnRef
(
p
);
reader
=
taosHashIterate
(
tcache
->
tableCache
,
reader
);
reader
=
taosHashIterate
(
tcache
->
tableCache
,
reader
);
...
@@ -185,20 +185,20 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
...
@@ -185,20 +185,20 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
reader
->
ctx
=
ctx
;
reader
->
ctx
=
ctx
;
if
(
0
!=
tfileReaderVerify
(
reader
))
{
if
(
0
!=
tfileReaderVerify
(
reader
))
{
indexError
(
"invalid tfile, suid:
%"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
indexError
(
"invalid tfile, suid:%"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
tfileReaderDestroy
(
reader
);
tfileReaderDestroy
(
reader
);
return
NULL
;
return
NULL
;
}
}
// T_REF_INC(reader);
// T_REF_INC(reader);
if
(
0
!=
tfileReaderLoadHeader
(
reader
))
{
if
(
0
!=
tfileReaderLoadHeader
(
reader
))
{
indexError
(
"failed to load index header, suid:
%"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
indexError
(
"failed to load index header, suid:%"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
reader
->
header
.
colName
);
tfileReaderDestroy
(
reader
);
tfileReaderDestroy
(
reader
);
return
NULL
;
return
NULL
;
}
}
if
(
0
!=
tfileReaderLoadFst
(
reader
))
{
if
(
0
!=
tfileReaderLoadFst
(
reader
))
{
indexError
(
"failed to load index fst, suid:
%"
PRIu64
", colName: %s, errno: %d"
,
reader
->
header
.
suid
,
indexError
(
"failed to load index fst, suid:%"
PRIu64
", colName: %s, errno: %d"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
,
errno
);
reader
->
header
.
colName
,
errno
);
tfileReaderDestroy
(
reader
);
tfileReaderDestroy
(
reader
);
return
NULL
;
return
NULL
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
d5aea7da
...
@@ -94,7 +94,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
...
@@ -94,7 +94,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
return
-
1
;
return
-
1
;
}
}
sDebug
(
"vgId:%d, rid:%"
PRId64
" is added to rsetId:%"
PRId64
,
pSyncInfo
->
vgId
,
pSyncNode
->
rid
,
tsNodeRefId
);
sDebug
(
"vgId:%d,
sync
rid:%"
PRId64
" is added to rsetId:%"
PRId64
,
pSyncInfo
->
vgId
,
pSyncNode
->
rid
,
tsNodeRefId
);
return
pSyncNode
->
rid
;
return
pSyncNode
->
rid
;
}
}
...
@@ -142,7 +142,7 @@ void syncStop(int64_t rid) {
...
@@ -142,7 +142,7 @@ void syncStop(int64_t rid) {
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosRemoveRef
(
tsNodeRefId
,
rid
);
taosRemoveRef
(
tsNodeRefId
,
rid
);
sDebug
(
"vgId:%d, rid:%"
PRId64
" is removed from rsetId:%"
PRId64
,
vgId
,
rid
,
tsNodeRefId
);
sDebug
(
"vgId:%d,
sync
rid:%"
PRId64
" is removed from rsetId:%"
PRId64
,
vgId
,
rid
,
tsNodeRefId
);
}
}
int32_t
syncSetStandby
(
int64_t
rid
)
{
int32_t
syncSetStandby
(
int64_t
rid
)
{
...
@@ -730,8 +730,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
...
@@ -730,8 +730,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
do
{
do
{
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose type:%s,%d, batch:%d"
,
TMSG_INFO
(
pMsgPArr
[
i
]
->
msgType
),
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose type:%s, batch:%d"
,
TMSG_INFO
(
pMsgPArr
[
i
]
->
msgType
),
arrSize
);
pMsgPArr
[
i
]
->
msgType
,
arrSize
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
}
while
(
0
);
...
@@ -791,7 +790,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -791,7 +790,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
do
{
do
{
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose type:%s
,%d"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose type:%s
"
,
TMSG_INFO
(
pMsg
->
msgType
)
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
}
while
(
0
);
...
@@ -799,7 +798,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -799,7 +798,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if
(
pSyncNode
->
changing
&&
pMsg
->
msgType
!=
TDMT_SYNC_CONFIG_CHANGE_FINISH
)
{
if
(
pSyncNode
->
changing
&&
pMsg
->
msgType
!=
TDMT_SYNC_CONFIG_CHANGE_FINISH
)
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_PROPOSE_NOT_READY
;
terrno
=
TSDB_CODE_SYN_PROPOSE_NOT_READY
;
sError
(
"vgId:%d,
sync propose not ready, type:%s,%d"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
sError
(
"vgId:%d,
failed to sync propose since not ready, type:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
)
);
goto
_END
;
goto
_END
;
}
}
...
@@ -808,8 +807,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -808,8 +807,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if
(
!
syncNodeCanChange
(
pSyncNode
))
{
if
(
!
syncNodeCanChange
(
pSyncNode
))
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_RECONFIG_NOT_READY
;
terrno
=
TSDB_CODE_SYN_RECONFIG_NOT_READY
;
sError
(
"vgId:%d, sync reconfig not ready, type:%s,%d"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
sError
(
"vgId:%d, failed to sync reconfig since not ready, type:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
pMsg
->
msgType
);
goto
_END
;
goto
_END
;
}
}
...
@@ -836,13 +834,12 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -836,13 +834,12 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncRespMgrDel
(
pSyncNode
->
pSyncRespMgr
,
seqNum
);
syncRespMgrDel
(
pSyncNode
->
pSyncRespMgr
,
seqNum
);
ret
=
1
;
ret
=
1
;
sDebug
(
"vgId:%d, optimized index:%"
PRId64
" success, msgtype:%s,%d"
,
pSyncNode
->
vgId
,
retIndex
,
sDebug
(
"vgId:%d, sync optimize index:%"
PRId64
", type:%s"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
));
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
}
else
{
}
else
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"vgId:%d,
optimized index:%"
PRId64
" error, msgtype:%s,%d
"
,
pSyncNode
->
vgId
,
retIndex
,
sError
(
"vgId:%d,
failed to sync optimize index:%"
PRId64
", type:%s
"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
)
,
pMsg
->
msgType
);
TMSG_INFO
(
pMsg
->
msgType
));
}
}
}
else
{
}
else
{
...
@@ -851,7 +848,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -851,7 +848,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
}
else
{
}
else
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"vgId:%d,
enqueue msg error, FpEqMsg is NULL
"
,
pSyncNode
->
vgId
);
sError
(
"vgId:%d,
failed to enqueue msg since its null
"
,
pSyncNode
->
vgId
);
}
}
}
}
...
@@ -861,8 +858,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -861,8 +858,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
}
else
{
}
else
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
sError
(
"vgId:%d, sync propose not leader, %s, msg
type:%s,%d
"
,
pSyncNode
->
vgId
,
sError
(
"vgId:%d, sync propose not leader, %s, msg
Type:%d,%s
"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
syncUtilState2String
(
pSyncNode
->
state
),
pMsg
->
msgType
,
TMSG_INFO
(
pMsg
->
msgType
)
);
goto
_END
;
goto
_END
;
}
}
...
@@ -887,7 +884,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
...
@@ -887,7 +884,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
}
}
}
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s
/raft_config.json"
,
pSyncInfo
->
path
);
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s
%sraft_config.json"
,
pSyncInfo
->
path
,
TD_DIRSEP
);
if
(
!
taosCheckExistFile
(
pSyncNode
->
configPath
))
{
if
(
!
taosCheckExistFile
(
pSyncNode
->
configPath
))
{
// create a new raft config file
// create a new raft config file
SRaftCfgMeta
meta
;
SRaftCfgMeta
meta
;
...
@@ -910,8 +907,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
...
@@ -910,8 +907,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init by SSyncInfo
// init by SSyncInfo
pSyncNode
->
vgId
=
pSyncInfo
->
vgId
;
pSyncNode
->
vgId
=
pSyncInfo
->
vgId
;
memcpy
(
pSyncNode
->
path
,
pSyncInfo
->
path
,
sizeof
(
pSyncNode
->
path
));
memcpy
(
pSyncNode
->
path
,
pSyncInfo
->
path
,
sizeof
(
pSyncNode
->
path
));
snprintf
(
pSyncNode
->
raftStorePath
,
sizeof
(
pSyncNode
->
raftStorePath
),
"%s/raft_store.json"
,
pSyncInfo
->
path
);
snprintf
(
pSyncNode
->
raftStorePath
,
sizeof
(
pSyncNode
->
raftStorePath
),
"%s%sraft_store.json"
,
pSyncInfo
->
path
,
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s/raft_config.json"
,
pSyncInfo
->
path
);
TD_DIRSEP
);
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s%sraft_config.json"
,
pSyncInfo
->
path
,
TD_DIRSEP
);
pSyncNode
->
pWal
=
pSyncInfo
->
pWal
;
pSyncNode
->
pWal
=
pSyncInfo
->
pWal
;
pSyncNode
->
msgcb
=
pSyncInfo
->
msgcb
;
pSyncNode
->
msgcb
=
pSyncInfo
->
msgcb
;
...
@@ -2764,7 +2762,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
...
@@ -2764,7 +2762,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ESyncState
state
=
flag
;
ESyncState
state
=
flag
;
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit
by
wal from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
endIndex
);
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit wal from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
endIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
syncNodeEventLog
(
ths
,
eventLog
);
// execute fsm
// execute fsm
...
@@ -2782,13 +2780,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
...
@@ -2782,13 +2780,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// user commit
// user commit
if
((
ths
->
pFsm
->
FpCommitCb
!=
NULL
)
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
if
((
ths
->
pFsm
->
FpCommitCb
!=
NULL
)
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
bool
internalExecute
=
true
;
bool
internalExecute
=
true
;
if
((
ths
->
replicaNum
==
1
)
&&
ths
->
restoreFinish
&&
(
ths
->
vgId
!=
1
)
)
{
if
((
ths
->
replicaNum
==
1
)
&&
ths
->
restoreFinish
&&
ths
->
vgId
!=
1
)
{
internalExecute
=
false
;
internalExecute
=
false
;
}
}
do
{
do
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"
index:%"
PRId64
", internalExecute
:%d"
,
i
,
internalExecute
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"
commit index:%"
PRId64
", internal
:%d"
,
i
,
internalExecute
);
syncNodeEventLog
(
ths
,
logBuf
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
d5aea7da
...
@@ -229,8 +229,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
...
@@ -229,8 +229,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
do
{
do
{
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write index:%"
PRId64
", type:%s,
%d, type2:%s,%d
"
,
pEntry
->
index
,
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write index:%"
PRId64
", type:%s,
origin type:%s
"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
TMSG_INFO
(
pEntry
->
msgType
),
TMSG_INFO
(
pEntry
->
originalRpcType
)
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
}
while
(
0
);
}
while
(
0
);
...
@@ -468,8 +468,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
...
@@ -468,8 +468,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
do
{
do
{
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write2 index:%"
PRId64
", type:%s,
%d, type2:%s,%d
"
,
pEntry
->
index
,
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write2 index:%"
PRId64
", type:%s,
origin type:%s
"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
TMSG_INFO
(
pEntry
->
msgType
),
TMSG_INFO
(
pEntry
->
originalRpcType
)
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
}
while
(
0
);
}
while
(
0
);
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
d5aea7da
...
@@ -50,9 +50,8 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
...
@@ -50,9 +50,8 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr add, type:%s,%d, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"save response handle, type:%s, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
...
@@ -77,9 +76,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
...
@@ -77,9 +76,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr get, type:%s,%d, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"get response handle, type:%s, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
...
@@ -98,9 +96,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
...
@@ -98,9 +96,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
char
eventLog
[
128
];
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr get-and-del, type:%s,%d, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"get-and-del response handle, type:%s, seq:%"
PRIu64
", handle:%p, ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosHashRemove
(
pObj
->
pRespHash
,
&
index
,
sizeof
(
index
));
taosHashRemove
(
pObj
->
pRespHash
,
&
index
,
sizeof
(
index
));
...
...
tests/script/tmp/r1.sim
0 → 100644
浏览文件 @
d5aea7da
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
print =============== step1: create dnodes
sql create dnode $hostname port 7200
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print =============== step2: create database
sql create database db vgroups 1 replica 1
sql show databases
if $rows != 3 then
return -1
endi
sql use db;
sql create table stb (ts timestamp, c int) tags (t int);
sql create table t0 using stb tags (0);
sql insert into t0 values(now, 1);
sql insert into t0 values(now+1s, 1);
return
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录