Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
65b4b8f0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
65b4b8f0
编写于
1月 06, 2021
作者:
R
root
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/sim' of
https://github.com/taosdata/TDengine
into feature/sim
上级
31fd62dd
578ef945
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
85 addition
and
40 deletion
+85
-40
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+5
-0
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+4
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+10
-10
src/os/src/detail/osTimer.c
src/os/src/detail/osTimer.c
+5
-0
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+2
-2
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+8
-4
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+5
-4
src/vnode/inc/vnodeStatus.h
src/vnode/inc/vnodeStatus.h
+1
-0
src/vnode/src/vnodeStatus.c
src/vnode/src/vnodeStatus.c
+12
-0
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+9
-5
tests/tsim/src/simExe.c
tests/tsim/src/simExe.c
+3
-1
tests/tsim/src/simMain.c
tests/tsim/src/simMain.c
+3
-3
tests/tsim/src/simSystem.c
tests/tsim/src/simSystem.c
+18
-10
未找到文件。
src/mnode/src/mnodeDb.c
浏览文件 @
65b4b8f0
...
...
@@ -311,6 +311,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
pCfg
->
replications
>
mnodeGetDnodesNum
())
{
mError
(
"no enough dnode to config replica: %d, #dnodes: %d"
,
pCfg
->
replications
,
mnodeGetDnodesNum
());
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
pCfg
->
quorum
<
TSDB_MIN_DB_REPLICA_OPTION
||
pCfg
->
quorum
>
TSDB_MAX_DB_REPLICA_OPTION
)
{
mError
(
"invalid db option quorum:%d valid range: [%d, %d]"
,
pCfg
->
quorum
,
TSDB_MIN_DB_REPLICA_OPTION
,
TSDB_MAX_DB_REPLICA_OPTION
);
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
65b4b8f0
...
...
@@ -1051,7 +1051,10 @@ static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, voi
memcpy
(
pRow
->
pHead
,
pHead
,
sizeof
(
SWalHead
)
+
pHead
->
len
);
pRow
->
rowData
=
pRow
->
pHead
->
cont
;
return
sdbWriteToQueue
(
pRow
,
qtype
);
int32_t
code
=
sdbWriteToQueue
(
pRow
,
qtype
);
if
(
code
==
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
code
=
0
;
return
code
;
}
static
int32_t
sdbWriteRowToQueue
(
SSdbRow
*
pInputRow
,
int32_t
action
)
{
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
65b4b8f0
...
...
@@ -827,21 +827,21 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
SCreateTableMsg
*
pCreateTable
=
(
SCreateTableMsg
*
)
((
char
*
)
pCreate
+
sizeof
(
SCMCreateTableMsg
));
int32_t
code
=
mnodeValidateCreateTableMsg
(
pCreateTable
,
pMsg
);
if
(
code
==
TSDB_CODE_SUCCESS
||
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
{
++
pMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pMsg
);
}
if
(
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mnodeDestroySubMsg
(
pMsg
)
;
return
code
;
++
pMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pMsg
);
}
else
if
(
code
==
TSDB_CODE_MND_ACTION_NEED_REPROCESSED
)
{
return
code
;
}
else
if
(
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
++
pMsg
->
pBatchMasterMsg
->
received
;
mnodeDestroySubMsg
(
pMsg
)
;
}
if
(
pMsg
->
pBatchMasterMsg
->
successed
+
pMsg
->
pBatchMasterMsg
->
received
>=
pMsg
->
pBatchMasterMsg
->
expected
)
{
return
code
;
}
else
{
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
dnodeSendRpcMWriteRsp
(
pMsg
->
pBatchMasterMsg
,
TSDB_CODE_SUCCESS
);
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
else
{
// batch master replay, reprocess the whole batch
assert
(
0
);
}
...
...
src/os/src/detail/osTimer.c
浏览文件 @
65b4b8f0
...
...
@@ -89,12 +89,17 @@ int taosInitTimer(void (*callback)(int), int ms) {
if
(
code
!=
0
)
{
uError
(
"failed to create timer thread"
);
return
-
1
;
}
else
{
uDebug
(
"timer thread:0x%08"
PRIx64
" is created"
,
taosGetPthreadId
(
timerThread
));
}
return
0
;
}
void
taosUninitTimer
()
{
stopTimer
=
true
;
uDebug
(
"join timer thread:0x%08"
PRIx64
,
taosGetPthreadId
(
timerThread
));
pthread_join
(
timerThread
,
NULL
);
}
...
...
src/query/src/qExecutor.c
浏览文件 @
65b4b8f0
...
...
@@ -1667,7 +1667,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
_end:
assert
(
offset
>=
0
&&
tsCols
!=
NULL
);
if
(
prevTs
!=
INT64_MIN
)
{
if
(
prevTs
!=
INT64_MIN
&&
prevTs
!=
*
(
int64_t
*
)
pRuntimeEnv
->
prevRow
[
0
]
)
{
assert
(
prevRowIndex
>=
0
);
item
->
lastKey
=
prevTs
+
step
;
}
...
...
@@ -7728,4 +7728,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease
(
pQueryMgmt
->
qinfoPool
,
pQInfo
,
freeHandle
);
return
0
;
}
\ No newline at end of file
}
src/sync/src/syncRestore.c
浏览文件 @
65b4b8f0
...
...
@@ -90,7 +90,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
break
;
}
sDebug
(
"%s, file:%s info is received from master, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
d
"
,
pPeer
->
id
,
sDebug
(
"%s, file:%s info is received from master, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
u
"
,
pPeer
->
id
,
minfo
.
name
,
minfo
.
index
,
minfo
.
size
,
minfo
.
fversion
,
minfo
.
magic
);
// remove extra files on slave between the current and last index
...
...
@@ -100,13 +100,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info
sinfo
=
minfo
;
sinfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
sinfo
.
name
,
&
sinfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
sinfo
.
size
,
&
sinfo
.
fversion
);
sDebug
(
"%s, local file:%s info, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
d
"
,
pPeer
->
id
,
sinfo
.
name
,
sDebug
(
"%s, local file:%s info, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
u
"
,
pPeer
->
id
,
sinfo
.
name
,
sinfo
.
index
,
sinfo
.
size
,
sinfo
.
fversion
,
sinfo
.
magic
);
// if file not there or magic is not the same, file shall be synced
memset
(
&
fileAck
,
0
,
sizeof
(
SFileAck
));
syncBuildFileAck
(
&
fileAck
,
pNode
->
vgId
);
fileAck
.
sync
=
(
sinfo
.
magic
!=
minfo
.
magic
||
sinfo
.
name
[
0
]
==
0
)
?
1
:
0
;
fileAck
.
sync
=
(
sinfo
.
magic
!=
minfo
.
magic
||
sinfo
.
size
!=
minfo
.
size
||
sinfo
.
name
[
0
]
==
0
)
?
1
:
0
;
// send file ack
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
SFileAck
));
...
...
@@ -195,7 +195,11 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
}
lastVer
=
pHead
->
version
;
(
*
pNode
->
writeToCache
)(
pNode
->
vgId
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
ret
=
(
*
pNode
->
writeToCache
)(
pNode
->
vgId
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
if
(
ret
!=
0
)
{
sError
(
"%s, failed to restore record since %s, hver:%"
PRIu64
,
pPeer
->
id
,
tstrerror
(
ret
),
pHead
->
version
);
break
;
}
}
if
(
code
<
0
)
{
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
65b4b8f0
...
...
@@ -104,7 +104,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
syncBuildFileInfo
(
&
fileInfo
,
pNode
->
vgId
);
sDebug
(
"%s, file:%s info is sent, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
d
"
,
pPeer
->
id
,
fileInfo
.
name
,
sDebug
(
"%s, file:%s info is sent, index:%d size:%"
PRId64
" fver:%"
PRIu64
" magic:%
u
"
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
index
,
fileInfo
.
size
,
fileInfo
.
fversion
,
fileInfo
.
magic
);
// send the file info
...
...
@@ -143,10 +143,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// if sync is not required, continue
if
(
fileAck
.
sync
==
0
)
{
fileInfo
.
index
++
;
sDebug
(
"%s, %s is the same
"
,
pPeer
->
id
,
fileInfo
.
name
);
sDebug
(
"%s, %s is the same
, fver:%"
PRIu64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
fversion
);
continue
;
}
else
{
sDebug
(
"%s, %s will be sent
"
,
pPeer
->
id
,
fileInfo
.
name
);
sDebug
(
"%s, %s will be sent
, fver:%"
PRIu64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
fversion
);
}
// get the full path to file
...
...
@@ -328,7 +328,8 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once
=
1
;
offset
+=
bytes
;
sDebug
(
"%s, continue retrieve last wal, bytes:%d offset:%"
PRId64
,
pPeer
->
id
,
bytes
,
offset
);
sDebug
(
"%s, continue retrieve last wal, bytes:%d offset:%"
PRId64
" sver:%"
PRIu64
" fver:%"
PRIu64
,
pPeer
->
id
,
bytes
,
offset
,
pPeer
->
sversion
,
fversion
);
}
return
-
1
;
...
...
src/vnode/inc/vnodeStatus.h
浏览文件 @
65b4b8f0
...
...
@@ -37,6 +37,7 @@ bool vnodeSetResetStatus(SVnodeObj* pVnode);
bool
vnodeInInitStatus
(
SVnodeObj
*
pVnode
);
bool
vnodeInReadyStatus
(
SVnodeObj
*
pVnode
);
bool
vnodeInReadyOrUpdatingStatus
(
SVnodeObj
*
pVnode
);
bool
vnodeInClosingStatus
(
SVnodeObj
*
pVnode
);
bool
vnodeInResetStatus
(
SVnodeObj
*
pVnode
);
...
...
src/vnode/src/vnodeStatus.c
浏览文件 @
65b4b8f0
...
...
@@ -135,6 +135,18 @@ bool vnodeInReadyStatus(SVnodeObj* pVnode) {
return
in
;
}
bool
vnodeInReadyOrUpdatingStatus
(
SVnodeObj
*
pVnode
)
{
bool
in
=
false
;
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
if
(
pVnode
->
status
==
TAOS_VN_STATUS_READY
||
pVnode
->
status
==
TAOS_VN_STATUS_UPDATING
)
{
in
=
true
;
}
pthread_mutex_unlock
(
&
pVnode
->
statusMutex
);
return
in
;
}
bool
vnodeInClosingStatus
(
SVnodeObj
*
pVnode
)
{
bool
in
=
false
;
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
65b4b8f0
...
...
@@ -90,7 +90,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write into WAL
code
=
walWrite
(
pVnode
->
wal
,
pHead
);
if
(
code
<
0
)
return
code
;
if
(
code
<
0
)
{
vError
(
"vgId:%d, hver:%"
PRIu64
" vver:%"
PRIu64
" code:0x%x"
,
pVnode
->
vgId
,
pHead
->
version
,
pVnode
->
version
,
code
);
return
code
;
}
pVnode
->
version
=
pHead
->
version
;
...
...
@@ -242,17 +245,18 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
if
(
pWrite
->
qtype
==
TAOS_QTYPE_RPC
)
{
int32_t
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, failed to write into vwqueue since %s"
,
pVnode
->
vgId
,
tstrerror
(
code
));
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
return
code
;
}
}
if
(
!
vnodeInReadyStatus
(
pVnode
))
{
v
Debug
(
"vgId:%d, vnode status is %s, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
]
,
pVnode
->
refCount
,
pVnode
);
if
(
!
vnodeInReady
OrUpdating
Status
(
pVnode
))
{
v
Error
(
"vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
],
pVnode
->
refCount
,
pVnode
);
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
vnodeRelease
(
pVnode
);
return
TSDB_CODE_APP_NOT_READY
;
}
...
...
tests/tsim/src/simExe.c
浏览文件 @
65b4b8f0
...
...
@@ -292,6 +292,9 @@ bool simExecuteRunBackCmd(SScript *script, char *option) {
if
(
pthread_create
(
&
newScript
->
bgPid
,
NULL
,
simExecuteScript
,
(
void
*
)
newScript
)
!=
0
)
{
sprintf
(
script
->
error
,
"lineNum:%d. create background thread failed"
,
script
->
lines
[
script
->
linePos
].
lineNum
);
return
false
;
}
else
{
simDebug
(
"script:%s, background thread:0x%08"
PRIx64
" is created"
,
newScript
->
fileName
,
taosGetPthreadId
(
newScript
->
bgPid
));
}
script
->
linePos
++
;
...
...
@@ -448,7 +451,6 @@ void simCloseNativeConnect(SScript *script) {
simDebug
(
"script:%s, taos:%p closed"
,
script
->
fileName
,
script
->
taos
);
taos_close
(
script
->
taos
);
taosMsleep
(
1200
);
script
->
taos
=
NULL
;
}
...
...
tests/tsim/src/simMain.c
浏览文件 @
65b4b8f0
...
...
@@ -40,14 +40,14 @@ int32_t main(int32_t argc, char *argv[]) {
printf
(
"usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-c config]: config directory, default is: %s
\n
"
,
configDir
);
printf
(
" [-f script]: script filename
\n
"
);
exit
(
0
)
;
return
0
;
}
}
if
(
!
simSystemInit
())
{
simError
(
"failed to initialize the system"
);
simSystemCleanUp
();
exit
(
1
)
;
return
-
1
;
}
simInfo
(
"simulator is running ..."
);
...
...
@@ -56,7 +56,7 @@ int32_t main(int32_t argc, char *argv[]) {
SScript
*
script
=
simParseScript
(
scriptFile
);
if
(
script
==
NULL
)
{
simError
(
"parse script file:%s failed"
,
scriptFile
);
exit
(
-
1
)
;
return
-
1
;
}
simScriptList
[
++
simScriptPos
]
=
script
;
...
...
tests/tsim/src/simSystem.c
浏览文件 @
65b4b8f0
...
...
@@ -93,27 +93,34 @@ void simFreeScript(SScript *script) {
for
(
int32_t
i
=
0
;
i
<
script
->
bgScriptLen
;
++
i
)
{
SScript
*
bgScript
=
script
->
bgScripts
[
i
];
sim
Info
(
"script:%s, set stop flag"
,
s
cript
->
fileName
);
sim
Debug
(
"script:%s, is background script, set stop flag"
,
bgS
cript
->
fileName
);
bgScript
->
killed
=
true
;
if
(
taosCheckPthreadValid
(
bgScript
->
bgPid
))
{
pthread_join
(
bgScript
->
bgPid
,
NULL
);
}
simDebug
(
"script:%s, background thread joined"
,
bgScript
->
fileName
);
taos_close
(
bgScript
->
taos
);
tfree
(
bgScript
->
lines
);
tfree
(
bgScript
->
optionBuffer
);
tfree
(
bgScript
);
}
}
simDebug
(
"script:%s, is freed"
,
script
->
fileName
);
taos_close
(
script
->
taos
);
tfree
(
script
->
lines
);
tfree
(
script
->
optionBuffer
);
tfree
(
script
);
simDebug
(
"script:%s, is cleaned"
,
script
->
fileName
);
taos_close
(
script
->
taos
);
tfree
(
script
->
lines
);
tfree
(
script
->
optionBuffer
);
tfree
(
script
);
}
}
SScript
*
simProcessCallOver
(
SScript
*
script
)
{
if
(
script
->
type
==
SIM_SCRIPT_TYPE_MAIN
)
{
simDebug
(
"script:%s, is main script, set stop flag"
,
script
->
fileName
);
if
(
script
->
killed
)
{
simInfo
(
"script:"
FAILED_PREFIX
"%s"
FAILED_POSTFIX
", "
FAILED_PREFIX
"failed"
FAILED_POSTFIX
", error:%s"
,
script
->
fileName
,
script
->
error
);
exit
(
-
1
)
;
return
NULL
;
}
else
{
simInfo
(
"script:"
SUCCESS_PREFIX
"%s"
SUCCESS_POSTFIX
", "
SUCCESS_PREFIX
"success"
SUCCESS_POSTFIX
,
script
->
fileName
);
...
...
@@ -125,13 +132,13 @@ SScript *simProcessCallOver(SScript *script) {
if
(
simScriptPos
==
-
1
)
{
simInfo
(
"----------------------------------------------------------------------"
);
simInfo
(
"Simulation Test Done, "
SUCCESS_PREFIX
"%d"
SUCCESS_POSTFIX
" Passed:
\n
"
,
simScriptSucced
);
exit
(
0
)
;
return
NULL
;
}
return
simScriptList
[
simScriptPos
];
}
}
else
{
sim
Info
(
"script:%s, is stopped by main script
"
,
script
->
fileName
);
sim
Debug
(
"script:%s, is stopped
"
,
script
->
fileName
);
simFreeScript
(
script
);
return
NULL
;
}
...
...
@@ -161,5 +168,6 @@ void *simExecuteScript(void *inputScript) {
}
}
simInfo
(
"thread is stopped"
);
return
NULL
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录