Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4ece8716
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4ece8716
编写于
12月 10, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
12月 10, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4519 from taosdata/feature/wal
Feature/wal
上级
f15bc71b
10510e1d
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
133 addition
and
30 deletion
+133
-30
src/common/src/tglobal.c
src/common/src/tglobal.c
+1
-1
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+1
-1
src/inc/taoserror.h
src/inc/taoserror.h
+3
-2
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+13
-6
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+1
-1
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+1
-1
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+4
-3
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+7
-7
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+4
-2
src/wal/inc/walInt.h
src/wal/inc/walInt.h
+1
-1
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+1
-3
tests/script/unique/arbitrator/insert_duplicationTs.sim
tests/script/unique/arbitrator/insert_duplicationTs.sim
+95
-2
未找到文件。
src/common/src/tglobal.c
浏览文件 @
4ece8716
...
...
@@ -206,7 +206,7 @@ int32_t tsNumOfLogLines = 10000000;
int32_t
mDebugFlag
=
131
;
int32_t
sdbDebugFlag
=
131
;
int32_t
dDebugFlag
=
135
;
int32_t
vDebugFlag
=
13
1
;
int32_t
vDebugFlag
=
13
5
;
int32_t
cDebugFlag
=
131
;
int32_t
jniDebugFlag
=
131
;
int32_t
odbcDebugFlag
=
131
;
...
...
src/cq/src/cqMain.c
浏览文件 @
4ece8716
...
...
@@ -161,7 +161,7 @@ void cqStop(void *handle) {
return
;
}
SCqContext
*
pContext
=
handle
;
c
Info
(
"vgId:%d, stop all CQs"
,
pContext
->
vgId
);
c
Debug
(
"vgId:%d, stop all CQs"
,
pContext
->
vgId
);
if
(
pContext
->
dbConn
==
NULL
||
pContext
->
master
==
0
)
return
;
pthread_mutex_lock
(
&
pContext
->
mutex
);
...
...
src/inc/taoserror.h
浏览文件 @
4ece8716
...
...
@@ -206,9 +206,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_OUT_OF_MEMORY
,
0
,
0x0508
,
"Out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_APP_ERROR
,
0
,
0x0509
,
"Unexpected generic error in vnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_VRESION_FILE
,
0
,
0x050A
,
"Invalid version file"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FULL
,
0
,
0x050B
,
"Vnode memory is full because commit failed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FULL
,
0
,
0x050B
,
"Database memory is full for commit failed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FLOWCTRL
,
0
,
0x050C
,
"Database memory is full for waiting commit"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_SYNCED
,
0
,
0x0511
,
"Database suspended"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
0
,
0x0512
,
"
W
rite operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
0
,
0x0512
,
"
Database w
rite operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_SYNCING
,
0
,
0x0513
,
"Database is syncing"
)
// tsdb
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
4ece8716
...
...
@@ -282,25 +282,32 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
// not thread safe, need optimized
int32_t
mnodeSaveQueryStreamList
(
SConnObj
*
pConn
,
SHeartBeatMsg
*
pHBMsg
)
{
pConn
->
numOfQueries
=
htonl
(
pHBMsg
->
numOfQueries
);
if
(
pConn
->
numOfQueries
>
0
)
{
pConn
->
numOfQueries
=
0
;
pConn
->
numOfStreams
=
0
;
int32_t
numOfQueries
=
htonl
(
pHBMsg
->
numOfQueries
);
if
(
numOfQueries
>
0
)
{
if
(
pConn
->
pQueries
==
NULL
)
{
pConn
->
pQueries
=
calloc
(
sizeof
(
SQueryDesc
),
QUERY_STREAM_SAVE_SIZE
);
}
int32_t
saveSize
=
MIN
(
QUERY_STREAM_SAVE_SIZE
,
pConn
->
numOfQueries
)
*
sizeof
(
SQueryDesc
);
pConn
->
numOfQueries
=
MIN
(
QUERY_STREAM_SAVE_SIZE
,
numOfQueries
);
int32_t
saveSize
=
pConn
->
numOfQueries
*
sizeof
(
SQueryDesc
);
if
(
saveSize
>
0
&&
pConn
->
pQueries
!=
NULL
)
{
memcpy
(
pConn
->
pQueries
,
pHBMsg
->
pData
,
saveSize
);
}
}
pConn
->
numOfStreams
=
htonl
(
pHBMsg
->
numOfStreams
);
if
(
pConn
->
numOfStreams
>
0
)
{
int32_t
numOfStreams
=
htonl
(
pHBMsg
->
numOfStreams
);
if
(
numOfStreams
>
0
)
{
if
(
pConn
->
pStreams
==
NULL
)
{
pConn
->
pStreams
=
calloc
(
sizeof
(
SStreamDesc
),
QUERY_STREAM_SAVE_SIZE
);
}
int32_t
saveSize
=
MIN
(
QUERY_STREAM_SAVE_SIZE
,
pConn
->
numOfStreams
)
*
sizeof
(
SStreamDesc
);
pConn
->
numOfStreams
=
MIN
(
QUERY_STREAM_SAVE_SIZE
,
numOfStreams
);
int32_t
saveSize
=
pConn
->
numOfStreams
*
sizeof
(
SStreamDesc
);
if
(
saveSize
>
0
&&
pConn
->
pStreams
!=
NULL
)
{
memcpy
(
pConn
->
pStreams
,
pHBMsg
->
pData
+
pConn
->
numOfQueries
*
sizeof
(
SQueryDesc
),
saveSize
);
}
...
...
src/sync/src/syncMain.c
浏览文件 @
4ece8716
...
...
@@ -1312,7 +1312,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
}
// always update version
sTrace
(
"vgId:%d,
forward to peer
, replica:%d role:%s qtype:%s hver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
sTrace
(
"vgId:%d,
update nodeVersion
, replica:%d role:%s qtype:%s hver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
syncRole
[
nodeRole
],
qtypeStr
[
qtype
],
pWalHead
->
version
);
nodeVersion
=
pWalHead
->
version
;
...
...
src/sync/src/syncRestore.c
浏览文件 @
4ece8716
...
...
@@ -43,7 +43,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
snprintf
(
fname
,
sizeof
(
fname
),
"%s/%s"
,
pNode
->
path
,
name
);
(
void
)
remove
(
fname
);
s
Debug
(
"%s, %s is removed
"
,
pPeer
->
id
,
fname
);
s
Info
(
"%s, %s is removed for its extra
"
,
pPeer
->
id
,
fname
);
index
++
;
if
(
index
>
eindex
)
break
;
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
4ece8716
...
...
@@ -46,6 +46,7 @@ typedef struct {
int8_t
isFull
;
int8_t
isCommiting
;
uint64_t
version
;
// current version
uint64_t
cversion
;
// version while commit start
uint64_t
fversion
;
// version on saved data file
void
*
wqueue
;
// write queue
void
*
qqueue
;
// read query queue
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
4ece8716
...
...
@@ -203,8 +203,8 @@ int32_t vnodeOpen(int32_t vgId) {
code
=
vnodeReadVersion
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, failed to read version, generate it from data file"
,
pVnode
->
vgId
);
// Allow vnode start even when read
version fails, set version as walV
ersion or zero
vError
(
"vgId:%d, failed to read
file
version, generate it from data file"
,
pVnode
->
vgId
);
// Allow vnode start even when read
file version fails, set file version as wal v
ersion or zero
// vnodeCleanUp(pVnode);
// return code;
}
...
...
@@ -442,6 +442,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
pVnode
->
fversion
,
pVnode
->
version
);
pVnode
->
isCommiting
=
0
;
pVnode
->
isFull
=
1
;
pVnode
->
cversion
=
pVnode
->
version
;
return
0
;
}
...
...
@@ -457,7 +458,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
if
(
status
==
TSDB_STATUS_COMMIT_OVER
)
{
pVnode
->
isCommiting
=
0
;
pVnode
->
isFull
=
0
;
pVnode
->
fversion
=
pVnode
->
version
;
pVnode
->
fversion
=
pVnode
->
c
version
;
vDebug
(
"vgId:%d, commit over, fver:%"
PRIu64
" vver:%"
PRIu64
,
pVnode
->
vgId
,
pVnode
->
fversion
,
pVnode
->
version
);
if
(
!
vnodeInInitStatus
(
pVnode
))
{
walRemoveOneOldFile
(
pVnode
->
wal
);
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
4ece8716
...
...
@@ -133,7 +133,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void
int32_t
code
=
vnodeWriteToRQueue
(
pVnode
,
qhandle
,
0
,
TAOS_QTYPE_QUERY
,
&
rpcMsg
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
v
Debug
(
"QInfo:%p add to vread queue for exec query"
,
*
qhandle
);
v
Trace
(
"QInfo:%p add to vread queue for exec query"
,
*
qhandle
);
}
return
code
;
...
...
@@ -164,7 +164,7 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle,
}
}
else
{
*
freeHandle
=
true
;
v
Debug
(
"QInfo:%p exec completed, free handle:%d"
,
*
handle
,
*
freeHandle
);
v
Trace
(
"QInfo:%p exec completed, free handle:%d"
,
*
handle
,
*
freeHandle
);
}
}
else
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
...
...
@@ -266,7 +266,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
if
(
handle
!=
NULL
)
{
v
Debug
(
"vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
*
handle
);
v
Trace
(
"vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app"
,
vgId
,
*
handle
);
code
=
vnodePutItemIntoReadQueue
(
pVnode
,
handle
,
pRead
->
rpcHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRsp
->
code
=
code
;
...
...
@@ -278,7 +278,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
assert
(
pCont
!=
NULL
);
void
**
qhandle
=
(
void
**
)
pRead
->
qhandle
;
v
Debug
(
"vgId:%d, QInfo:%p, dnode continues to exec query"
,
pVnode
->
vgId
,
*
qhandle
);
v
Trace
(
"vgId:%d, QInfo:%p, dnode continues to exec query"
,
pVnode
->
vgId
,
*
qhandle
);
// In the retrieve blocking model, only 50% CPU will be used in query processing
if
(
tsHalfCoresForQuery
)
{
...
...
@@ -294,7 +294,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRead
->
rpcHandle
=
qGetResultRetrieveMsg
(
*
qhandle
);
assert
(
pRead
->
rpcHandle
!=
NULL
);
v
Debug
(
"vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p"
,
pVnode
->
vgId
,
*
qhandle
,
v
Trace
(
"vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p"
,
pVnode
->
vgId
,
*
qhandle
,
pRead
->
rpcHandle
);
// set the real rsp error code
...
...
@@ -327,7 +327,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
pRetrieve
->
qhandle
=
htobe64
(
pRetrieve
->
qhandle
);
v
Debug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
(
void
*
)
pRetrieve
->
qhandle
,
v
Trace
(
"vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p"
,
pVnode
->
vgId
,
(
void
*
)
pRetrieve
->
qhandle
,
pRetrieve
->
free
,
pRead
->
rpcHandle
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
...
...
@@ -410,6 +410,6 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
pMsg
->
header
.
vgId
=
htonl
(
vgId
);
pMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
v
Debug
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
v
Trace
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
pMsg
,
sizeof
(
SRetrieveTableMsg
));
}
src/vnode/src/vnodeWrite.c
浏览文件 @
4ece8716
...
...
@@ -244,7 +244,7 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
int32_t
queued
=
atomic_add_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
if
(
queued
>
MAX_QUEUED_MSG_NUM
)
{
vDebug
(
"vgId:%d, too many msg:%d in vwqueue, flow control"
,
pVnode
->
vgId
,
queued
);
taosMsleep
(
1
);
taosMsleep
(
3
);
}
code
=
vnodePerformFlowCtrl
(
pWrite
);
...
...
@@ -271,6 +271,8 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
SVnodeObj
*
pVnode
=
pWrite
->
pVnode
;
int32_t
code
=
TSDB_CODE_VND_SYNCING
;
if
(
pVnode
->
flowctrlLevel
<=
0
)
code
=
TSDB_CODE_VND_IS_FLOWCTRL
;
pWrite
->
processedCount
++
;
if
(
pWrite
->
processedCount
>
100
)
{
vError
(
"vgId:%d, msg:%p, failed to process since %s, retry:%d"
,
pVnode
->
vgId
,
pWrite
,
tstrerror
(
code
),
...
...
@@ -290,8 +292,8 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
static
int32_t
vnodePerformFlowCtrl
(
SVWriteMsg
*
pWrite
)
{
SVnodeObj
*
pVnode
=
pWrite
->
pVnode
;
if
(
pVnode
->
flowctrlLevel
<=
0
)
return
0
;
if
(
pWrite
->
qtype
!=
TAOS_QTYPE_RPC
)
return
0
;
if
(
pVnode
->
queuedWMsg
<
MAX_QUEUED_MSG_NUM
&&
pVnode
->
flowctrlLevel
<=
0
)
return
0
;
if
(
tsFlowCtrl
==
0
)
{
int32_t
ms
=
pow
(
2
,
pVnode
->
flowctrlLevel
+
2
);
...
...
src/wal/inc/walInt.h
浏览文件 @
4ece8716
...
...
@@ -38,7 +38,7 @@ extern int32_t wDebugFlag;
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 3
#define WAL_FILE_NUM
1 //
3
typedef
struct
{
uint64_t
version
;
...
...
src/wal/src/walWrite.c
浏览文件 @
4ece8716
...
...
@@ -173,7 +173,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
continue
;
}
wInfo
(
"vgId:%d, file:%s, restore success
"
,
pWal
->
vgId
,
walName
);
wInfo
(
"vgId:%d, file:%s, restore success
, wver:%"
PRIu64
,
pWal
->
vgId
,
walName
,
pWal
->
version
);
count
++
;
}
...
...
@@ -267,8 +267,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
return
TAOS_SYSTEM_ERROR
(
errno
);
}
wDebug
(
"vgId:%d, file:%s, start to restore"
,
pWal
->
vgId
,
name
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
int64_t
offset
=
0
;
SWalHead
*
pHead
=
buffer
;
...
...
tests/script/unique/arbitrator/insert_duplicationTs.sim
浏览文件 @
4ece8716
...
...
@@ -91,8 +91,11 @@ while $i < $tblNum
$i = $i + 1
endw
sql show db.vgroups;
print d1: $data04 $data05 , d2: $data06 $data07
sql select count(*) from $stb
print rows:$rows data00:$data00
print r
test1==> r
ows:$rows data00:$data00
if $rows != 1 then
return -1
endi
...
...
@@ -103,6 +106,15 @@ endi
$totalRows = $data00
sql select count(*) from $stb
print test2==> rows:$rows data00:$data00
sql select count(*) from $stb
print test3==> rows:$rows data00:$data00
sql select count(*) from $stb
print test4==> rows:$rows data00:$data00
sql select count(*) from $stb
print test5==> rows:$rows data00:$data00
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
sql insert into $tb values ( now - 20d , -20 )
sql insert into $tb values ( now - 40d , -40 )
...
...
@@ -153,12 +165,21 @@ if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
sql select count(*) from $stb
print data00 $data00
sql select count(*) from $stb
print data00 $data00
sql select count(*) from $stb
print data00 $data00
print ============== step5: insert two data rows: now-16d, now+16d,
sql insert into $tb values ( now - 21d , -21 )
sql insert into $tb values ( now - 41d , -41 )
$totalRows = $totalRows + 2
print ============== step
5
: restart dnode2, waiting sync end
print ============== step
6
: restart dnode2, waiting sync end
system sh/exec.sh -n dnode2 -s start
sleep 3000
$loopCnt = 0
...
...
@@ -192,9 +213,81 @@ endi
sleep $sleepTimer
# check using select
sleep 5000
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录