Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d9d28841
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d9d28841
编写于
11月 04, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/update
上级
f605be10
dc11aa28
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
138 addition
and
158 deletion
+138
-158
src/dnode/inc/dnodeVWrite.h
src/dnode/inc/dnodeVWrite.h
+6
-3
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+1
-1
src/dnode/src/dnodePeer.c
src/dnode/src/dnodePeer.c
+4
-4
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+2
-2
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+2
-2
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+73
-99
src/inc/dnode.h
src/inc/dnode.h
+5
-5
src/os/inc/osAlloc.h
src/os/inc/osAlloc.h
+2
-2
src/os/inc/osFile.h
src/os/inc/osFile.h
+11
-7
src/os/inc/osSocket.h
src/os/inc/osSocket.h
+4
-4
src/os/src/detail/osAlloc.c
src/os/src/detail/osAlloc.c
+3
-3
src/os/src/detail/osFail.c
src/os/src/detail/osFail.c
+11
-11
src/os/src/detail/osFile.c
src/os/src/detail/osFile.c
+6
-2
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+1
-3
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+0
-2
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+0
-2
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+5
-5
src/wal/src/walMgmt.c
src/wal/src/walMgmt.c
+1
-1
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+1
-0
未找到文件。
src/dnode/inc/dnodeVWrite.h
浏览文件 @
d9d28841
...
...
@@ -20,9 +20,12 @@
extern
"C"
{
#endif
int32_t
dnodeInitVnodeWrite
();
void
dnodeCleanupVnodeWrite
();
void
dnodeDispatchToVnodeWriteQueue
(
SRpcMsg
*
pMsg
);
int32_t
dnodeInitVWrite
();
void
dnodeCleanupVWrite
();
void
dnodeDispatchToVWriteQueue
(
SRpcMsg
*
pMsg
);
void
*
dnodeAllocVWriteQueue
(
void
*
pVnode
);
void
dnodeFreeVWriteQueue
(
void
*
wqueue
);
void
dnodeSendRpcVWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
#ifdef __cplusplus
}
...
...
src/dnode/src/dnodeMain.c
浏览文件 @
d9d28841
...
...
@@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{
"wal"
,
walInit
,
walCleanUp
},
{
"check"
,
dnodeInitCheck
,
dnodeCleanupCheck
},
// NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{
"vread"
,
dnodeInitVnodeRead
,
dnodeCleanupVnodeRead
},
{
"vwrite"
,
dnodeInitV
nodeWrite
,
dnodeCleanupVnode
Write
},
{
"vwrite"
,
dnodeInitV
Write
,
dnodeCleanupV
Write
},
{
"mread"
,
dnodeInitMnodeRead
,
dnodeCleanupMnodeRead
},
{
"mwrite"
,
dnodeInitMnodeWrite
,
dnodeCleanupMnodeWrite
},
{
"mpeer"
,
dnodeInitMnodePeer
,
dnodeCleanupMnodePeer
},
...
...
src/dnode/src/dnodePeer.c
浏览文件 @
d9d28841
...
...
@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static
void
*
tsDnodeClientRpc
=
NULL
;
int32_t
dnodeInitServer
()
{
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
dnodeDispatchToVWriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeDispatchToVWriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
dnodeDispatchToVWriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
dnodeDispatchToVWriteQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeDispatchToMgmtQueue
;
dnodeProcessReqMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
dnodeDispatchToMgmtQueue
;
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
d9d28841
...
...
@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static
int32_t
tsDnodeSubmitReqNum
=
0
;
int32_t
dnodeInitShell
()
{
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeDispatchToVWriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_QUERY
]
=
dnodeDispatchToVnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_FETCH
]
=
dnodeDispatchToVnodeReadQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
dnodeDispatchToV
node
WriteQueue
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
dnodeDispatchToVWriteQueue
;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_CM_CREATE_ACCT
]
=
dnodeDispatchToMnodeWriteQueue
;
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
d9d28841
...
...
@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
}
void
*
dnodeAlloc
ateVnodeRq
ueue
(
void
*
pVnode
)
{
void
*
dnodeAlloc
VReadQ
ueue
(
void
*
pVnode
)
{
pthread_mutex_lock
(
&
readPool
.
mutex
);
taos_queue
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
{
...
...
@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return
queue
;
}
void
dnodeFreeV
nodeRq
ueue
(
void
*
rqueue
)
{
void
dnodeFreeV
ReadQ
ueue
(
void
*
rqueue
)
{
taosCloseQueue
(
rqueue
);
// dynamically adjust the number of threads
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
d9d28841
...
...
@@ -15,74 +15,65 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tutil.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tsync.h"
#include "vnode.h"
#include "dnodeInt.h"
#include "syncInt.h"
#include "dnodeVWrite.h"
#include "dnodeMgmt.h"
#include "dnodeInt.h"
typedef
struct
{
taos_qall
qall
;
taos_qset
qset
;
// queue set
pthread_t
thread
;
// thread
int32_t
workerId
;
// worker ID
taos_qall
qall
;
taos_qset
qset
;
// queue set
int32_t
workerId
;
// worker ID
pthread_t
thread
;
// thread
}
SWriteWorker
;
typedef
struct
{
SRspRet
rspRet
;
int32_t
processedCount
;
int32_t
code
;
void
*
pCont
;
int32_t
contLen
;
SRpcMsg
rpcMsg
;
SRspRet
rspRet
;
SRpcMsg
rpcMsg
;
int32_t
processedCount
;
int32_t
code
;
int32_t
contLen
;
void
*
pCont
;
}
SWriteMsg
;
typedef
struct
{
int32_t
max
;
// max number of workers
int32_t
nextId
;
// from 0 to max-1, cyclic
SWriteWorker
*
writeW
orker
;
int32_t
max
;
// max number of workers
int32_t
nextId
;
// from 0 to max-1, cyclic
SWriteWorker
*
w
orker
;
pthread_mutex_t
mutex
;
}
SWriteWorkerPool
;
static
SWriteWorkerPool
tsVWriteWP
;
static
void
*
dnodeProcessWriteQueue
(
void
*
param
);
static
void
dnodeHandleIdleWorker
(
SWriteWorker
*
pWorker
);
SWriteWorkerPool
wWorkerPool
;
int32_t
dnodeInitVWrite
()
{
tsVWriteWP
.
max
=
tsNumOfCores
;
tsVWriteWP
.
worker
=
(
SWriteWorker
*
)
tcalloc
(
sizeof
(
SWriteWorker
),
tsVWriteWP
.
max
);
if
(
tsVWriteWP
.
worker
==
NULL
)
return
-
1
;
pthread_mutex_init
(
&
tsVWriteWP
.
mutex
,
NULL
);
int32_t
dnodeInitVnodeWrite
()
{
wWorkerPool
.
max
=
tsNumOfCores
;
wWorkerPool
.
writeWorker
=
(
SWriteWorker
*
)
calloc
(
sizeof
(
SWriteWorker
),
wWorkerPool
.
max
);
if
(
wWorkerPool
.
writeWorker
==
NULL
)
return
-
1
;
pthread_mutex_init
(
&
wWorkerPool
.
mutex
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
wWorkerPool
.
max
;
++
i
)
{
wWorkerPool
.
writeWorker
[
i
].
workerId
=
i
;
for
(
int32_t
i
=
0
;
i
<
tsVWriteWP
.
max
;
++
i
)
{
tsVWriteWP
.
worker
[
i
].
workerId
=
i
;
}
dInfo
(
"dnode
write is initialized, max worker %d"
,
wWorkerPool
.
max
);
dInfo
(
"dnode
vwrite is initialized, max worker %d"
,
tsVWriteWP
.
max
);
return
0
;
}
void
dnodeCleanupV
node
Write
()
{
for
(
int32_t
i
=
0
;
i
<
wWorkerPool
.
max
;
++
i
)
{
SWriteWorker
*
pWorker
=
wWorkerPool
.
writeW
orker
+
i
;
void
dnodeCleanupVWrite
()
{
for
(
int32_t
i
=
0
;
i
<
tsVWriteWP
.
max
;
++
i
)
{
SWriteWorker
*
pWorker
=
tsVWriteWP
.
w
orker
+
i
;
if
(
pWorker
->
thread
)
{
taosQsetThreadResume
(
pWorker
->
qset
);
}
}
for
(
int32_t
i
=
0
;
i
<
wWorkerPool
.
max
;
++
i
)
{
SWriteWorker
*
pWorker
=
wWorkerPool
.
writeW
orker
+
i
;
for
(
int32_t
i
=
0
;
i
<
tsVWriteWP
.
max
;
++
i
)
{
SWriteWorker
*
pWorker
=
tsVWriteWP
.
w
orker
+
i
;
if
(
pWorker
->
thread
)
{
pthread_join
(
pWorker
->
thread
,
NULL
);
taosFreeQall
(
pWorker
->
qall
);
...
...
@@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() {
}
}
pthread_mutex_destroy
(
&
wWorkerPool
.
mutex
);
free
(
wWorkerPool
.
writeW
orker
);
dInfo
(
"dnode write is closed"
);
pthread_mutex_destroy
(
&
tsVWriteWP
.
mutex
);
tfree
(
tsVWriteWP
.
w
orker
);
dInfo
(
"dnode
v
write is closed"
);
}
void
dnodeDispatchToV
node
WriteQueue
(
SRpcMsg
*
pMsg
)
{
char
*
pCont
=
(
char
*
)
pMsg
->
pCont
;
void
dnodeDispatchToVWriteQueue
(
SRpcMsg
*
pMsg
)
{
char
*
pCont
=
pMsg
->
pCont
;
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
)
{
SMsgDesc
*
pDesc
=
(
SMsgDesc
*
)
pCont
;
...
...
@@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
taos_queue
queue
=
vnodeAcquireWqueue
(
pHead
->
vgId
);
if
(
queue
)
{
// put message into queue
SWriteMsg
*
pWrite
=
(
SWriteMsg
*
)
taosAllocateQitem
(
sizeof
(
SWriteMsg
));
SWriteMsg
*
pWrite
=
taosAllocateQitem
(
sizeof
(
SWriteMsg
));
pWrite
->
rpcMsg
=
*
pMsg
;
pWrite
->
pCont
=
pCont
;
pWrite
->
contLen
=
pHead
->
contLen
;
...
...
@@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
}
}
void
*
dnodeAlloc
ateVnodeWq
ueue
(
void
*
pVnode
)
{
pthread_mutex_lock
(
&
wWorkerPool
.
mutex
);
SWriteWorker
*
pWorker
=
wWorkerPool
.
writeWorker
+
wWorkerPool
.
nextId
;
void
*
dnodeAlloc
VWriteQ
ueue
(
void
*
pVnode
)
{
pthread_mutex_lock
(
&
tsVWriteWP
.
mutex
);
SWriteWorker
*
pWorker
=
tsVWriteWP
.
worker
+
tsVWriteWP
.
nextId
;
void
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
{
pthread_mutex_unlock
(
&
wWorkerPool
.
mutex
);
pthread_mutex_unlock
(
&
tsVWriteWP
.
mutex
);
return
NULL
;
}
...
...
@@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pWorker
->
qset
=
taosOpenQset
();
if
(
pWorker
->
qset
==
NULL
)
{
taosCloseQueue
(
queue
);
pthread_mutex_unlock
(
&
wWorkerPool
.
mutex
);
pthread_mutex_unlock
(
&
tsVWriteWP
.
mutex
);
return
NULL
;
}
...
...
@@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if
(
pWorker
->
qall
==
NULL
)
{
taosCloseQset
(
pWorker
->
qset
);
taosCloseQueue
(
queue
);
pthread_mutex_unlock
(
&
wWorkerPool
.
mutex
);
pthread_mutex_unlock
(
&
tsVWriteWP
.
mutex
);
return
NULL
;
}
pthread_attr_t
thAttr
;
...
...
@@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pWorker
->
thread
,
&
thAttr
,
dnodeProcessWriteQueue
,
pWorker
)
!=
0
)
{
dError
(
"failed to create thread to process
read queue, reason:
%s"
,
strerror
(
errno
));
dError
(
"failed to create thread to process
vwrite queue since
%s"
,
strerror
(
errno
));
taosFreeQall
(
pWorker
->
qall
);
taosCloseQset
(
pWorker
->
qset
);
taosCloseQueue
(
queue
);
queue
=
NULL
;
}
else
{
dDebug
(
"write worker:%d is launched"
,
pWorker
->
workerId
);
wWorkerPool
.
nextId
=
(
wWorkerPool
.
nextId
+
1
)
%
wWorkerPool
.
max
;
dDebug
(
"
dnode v
write worker:%d is launched"
,
pWorker
->
workerId
);
tsVWriteWP
.
nextId
=
(
tsVWriteWP
.
nextId
+
1
)
%
tsVWriteWP
.
max
;
}
pthread_attr_destroy
(
&
thAttr
);
}
else
{
taosAddIntoQset
(
pWorker
->
qset
,
queue
,
pVnode
);
wWorkerPool
.
nextId
=
(
wWorkerPool
.
nextId
+
1
)
%
wWorkerPool
.
max
;
tsVWriteWP
.
nextId
=
(
tsVWriteWP
.
nextId
+
1
)
%
tsVWriteWP
.
max
;
}
pthread_mutex_unlock
(
&
wWorkerPool
.
mutex
);
dDebug
(
"pVnode:%p, write queue:%p is allocated"
,
pVnode
,
queue
);
pthread_mutex_unlock
(
&
tsVWriteWP
.
mutex
);
dDebug
(
"pVnode:%p,
dnode v
write queue:%p is allocated"
,
pVnode
,
queue
);
return
queue
;
}
void
dnodeFreeV
nodeWq
ueue
(
void
*
wqueue
)
{
void
dnodeFreeV
WriteQ
ueue
(
void
*
wqueue
)
{
taosCloseQueue
(
wqueue
);
// dynamically adjust the number of threads
}
void
dnodeSendRpcV
node
WriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
)
{
SWriteMsg
*
pWrite
=
(
SWriteMsg
*
)
param
;
if
(
pWrite
==
NULL
)
return
;
void
dnodeSendRpcVWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
)
{
if
(
param
==
NULL
)
return
;
SWriteMsg
*
pWrite
=
param
;
if
(
code
<
0
)
pWrite
->
code
=
code
;
int32_t
count
=
atomic_add_fetch_32
(
&
pWrite
->
processedCount
,
1
);
...
...
@@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker
*
pWorker
=
(
SWriteWorker
*
)
param
;
SWriteMsg
*
pWrite
;
SWalHead
*
pHead
;
int32_t
numOfMsgs
;
int
type
;
void
*
pVnode
,
*
item
;
SRspRet
*
pRspRet
;
void
*
pVnode
;
void
*
pItem
;
int32_t
numOfMsgs
;
int32_t
qtype
;
dDebug
(
"write worker:%d is running"
,
pWorker
->
workerId
);
dDebug
(
"
dnode v
write worker:%d is running"
,
pWorker
->
workerId
);
while
(
1
)
{
numOfMsgs
=
taosReadAllQitemsFromQset
(
pWorker
->
qset
,
pWorker
->
qall
,
&
pVnode
);
if
(
numOfMsgs
==
0
)
{
dDebug
(
"qset:%p, dnode write got no message from qset, exiting"
,
pWorker
->
qset
);
dDebug
(
"qset:%p, dnode
v
write got no message from qset, exiting"
,
pWorker
->
qset
);
break
;
}
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
pWrite
=
NULL
;
pRspRet
=
NULL
;
taosGetQitem
(
pWorker
->
qall
,
&
type
,
&
i
tem
);
if
(
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
(
SWriteMsg
*
)
i
tem
;
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
&
pI
tem
);
if
(
q
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
pI
tem
;
pRspRet
=
&
pWrite
->
rspRet
;
pHead
=
(
SWalHead
*
)(
pWrite
->
pCont
-
sizeof
(
SWalHead
));
pHead
=
(
SWalHead
*
)(
(
char
*
)
pWrite
->
pCont
-
sizeof
(
SWalHead
));
pHead
->
msgType
=
pWrite
->
rpcMsg
.
msgType
;
pHead
->
version
=
0
;
pHead
->
len
=
pWrite
->
contLen
;
dDebug
(
"%p, rpc msg:%s will be processed in vwrite queue"
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
}
else
if
(
type
==
TAOS_QTYPE_CQ
)
{
pHead
=
(
SWalHead
*
)((
char
*
)
i
tem
+
sizeof
(
SSyncHead
));
}
else
if
(
q
type
==
TAOS_QTYPE_CQ
)
{
pHead
=
(
SWalHead
*
)((
char
*
)
pI
tem
+
sizeof
(
SSyncHead
));
dTrace
(
"%p, CQ wal msg:%s will be processed in vwrite queue, version:%"
PRIu64
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
);
}
else
{
pHead
=
(
SWalHead
*
)
i
tem
;
pHead
=
pI
tem
;
dTrace
(
"%p, wal msg:%s will be processed in vwrite queue, version:%"
PRIu64
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
);
}
int32_t
code
=
vnodeProcessWrite
(
pVnode
,
type
,
pHead
,
pRspRet
);
int32_t
code
=
vnodeProcessWrite
(
pVnode
,
q
type
,
pHead
,
pRspRet
);
dTrace
(
"%p, msg:%s is processed in vwrite queue, version:%"
PRIu64
", result:%s"
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
,
tstrerror
(
code
));
...
...
@@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems
(
pWorker
->
qall
);
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
pWorker
->
qall
,
&
type
,
&
i
tem
);
if
(
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
(
SWriteMsg
*
)
i
tem
;
dnodeSendRpcV
nodeWriteRsp
(
pVnode
,
i
tem
,
pWrite
->
rpcMsg
.
code
);
}
else
if
(
type
==
TAOS_QTYPE_FWD
)
{
pHead
=
(
SWalHead
*
)
i
tem
;
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
&
pI
tem
);
if
(
q
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
pI
tem
;
dnodeSendRpcV
WriteRsp
(
pVnode
,
pI
tem
,
pWrite
->
rpcMsg
.
code
);
}
else
if
(
q
type
==
TAOS_QTYPE_FWD
)
{
pHead
=
pI
tem
;
vnodeConfirmForward
(
pVnode
,
pHead
->
version
,
0
);
taosFreeQitem
(
i
tem
);
taosFreeQitem
(
pI
tem
);
vnodeRelease
(
pVnode
);
}
else
{
taosFreeQitem
(
i
tem
);
taosFreeQitem
(
pI
tem
);
vnodeRelease
(
pVnode
);
}
}
...
...
@@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) {
return
NULL
;
}
UNUSED_FUNC
static
void
dnodeHandleIdleWorker
(
SWriteWorker
*
pWorker
)
{
int32_t
num
=
taosGetQueueNumber
(
pWorker
->
qset
);
if
(
num
>
0
)
{
usleep
(
30000
);
sched_yield
();
}
else
{
taosFreeQall
(
pWorker
->
qall
);
taosCloseQset
(
pWorker
->
qset
);
pWorker
->
qset
=
NULL
;
dDebug
(
"write worker:%d is released"
,
pWorker
->
workerId
);
pthread_exit
(
NULL
);
}
}
src/inc/dnode.h
浏览文件 @
d9d28841
...
...
@@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void
dnodeSendMsgToDnodeRecv
(
SRpcMsg
*
rpcMsg
,
SRpcMsg
*
rpcRsp
,
SRpcEpSet
*
epSet
);
void
*
dnodeSendCfgTableToRecv
(
int32_t
vgId
,
int32_t
tid
);
void
*
dnodeAlloc
ateVnodeWq
ueue
(
void
*
pVnode
);
void
dnodeFreeV
nodeWqueue
(
void
*
queue
);
void
*
dnodeAlloc
ateVnodeRq
ueue
(
void
*
pVnode
);
void
dnodeFreeV
nodeRq
ueue
(
void
*
rqueue
);
void
dnodeSendRpcV
node
WriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
void
*
dnodeAlloc
VWriteQ
ueue
(
void
*
pVnode
);
void
dnodeFreeV
WriteQueue
(
void
*
w
queue
);
void
*
dnodeAlloc
VReadQ
ueue
(
void
*
pVnode
);
void
dnodeFreeV
ReadQ
ueue
(
void
*
rqueue
);
void
dnodeSendRpcVWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
int32_t
dnodeAllocateMnodePqueue
();
void
dnodeFreeMnodePqueue
();
...
...
src/os/inc/osAlloc.h
浏览文件 @
d9d28841
...
...
@@ -22,14 +22,14 @@ extern "C" {
#ifndef TAOS_OS_FUNC_ALLOC
#define tmalloc(size) malloc(size)
#define tcalloc(
size) calloc(1
, size)
#define tcalloc(
nmemb, size) calloc(nmemb
, size)
#define trealloc(p, size) realloc(p, size)
#define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size)
#else
void
*
tmalloc
(
int32_t
size
);
void
*
tcalloc
(
int32_t
size
);
void
*
tcalloc
(
int32_t
nmemb
,
int32_t
size
);
void
*
trealloc
(
void
*
p
,
int32_t
size
);
void
*
tmemalign
(
int32_t
alignment
,
int32_t
size
);
void
tfree
(
void
*
p
);
...
...
src/os/inc/osFile.h
浏览文件 @
d9d28841
...
...
@@ -31,10 +31,14 @@ extern "C" {
} \
}
int64_t
taosRead
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosWrite
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosLSeek
(
int32_t
fd
,
int64_t
offset
,
int32_t
whence
);
int64_t
taosRead
Imp
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosWrite
Imp
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosLSeek
Imp
(
int32_t
fd
,
int64_t
offset
,
int32_t
whence
);
int32_t
taosRenameFile
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
);
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
#define taosClose(x) tclose(x)
// TAOS_OS_FUNC_FILE_SENDIFLE
...
...
@@ -42,12 +46,12 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
int64_t
taosFSendFile
(
FILE
*
outfile
,
FILE
*
infile
,
int64_t
*
offset
,
int64_t
size
);
#ifdef TAOS_RANDOM_FILE_FAIL
void
taosSetRandomFileFailFactor
(
int
factor
);
void
taosSetRandomFileFailFactor
(
int
32_t
factor
);
void
taosSetRandomFileFailOutput
(
const
char
*
path
);
#ifdef TAOS_RANDOM_FILE_FAIL_TEST
ssize_t
taosReadFileRandomFail
(
int
fd
,
void
*
buf
,
size
_t
count
,
const
char
*
file
,
uint32_t
line
);
ssize_t
taosWriteFileRandomFail
(
int
fd
,
void
*
buf
,
size
_t
count
,
const
char
*
file
,
uint32_t
line
);
off_t
taosLSeekRandomFail
(
int
fd
,
off_t
offset
,
in
t
whence
,
const
char
*
file
,
uint32_t
line
);
int64_t
taosReadFileRandomFail
(
int32_t
fd
,
void
*
buf
,
int32
_t
count
,
const
char
*
file
,
uint32_t
line
);
int64_t
taosWriteFileRandomFail
(
int32_t
fd
,
void
*
buf
,
int32
_t
count
,
const
char
*
file
,
uint32_t
line
);
int64_t
taosLSeekRandomFail
(
int32_t
fd
,
int64_t
offset
,
int32_
t
whence
,
const
char
*
file
,
uint32_t
line
);
#undef taosRead
#undef taosWrite
#undef taosLSeek
...
...
src/os/inc/osSocket.h
浏览文件 @
d9d28841
...
...
@@ -42,10 +42,10 @@ extern "C" {
#ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
ssize
_t
taosSendRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
);
ssize
_t
taosSendToRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize
_t
taosReadSocketRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
);
ssize
_t
taosWriteSocketRandomFail
(
int32_t
fd
,
const
void
*
buf
,
size_t
count
);
int64
_t
taosSendRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
);
int64
_t
taosSendToRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
int64
_t
taosReadSocketRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
);
int64
_t
taosWriteSocketRandomFail
(
int32_t
fd
,
const
void
*
buf
,
size_t
count
);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
...
...
src/os/src/detail/osAlloc.c
浏览文件 @
d9d28841
...
...
@@ -32,11 +32,11 @@ void *tmalloc(int32_t size) {
return
p
;
}
void
*
tcalloc
(
int32_t
size
)
{
void
*
p
=
calloc
(
1
,
size
);
void
*
tcalloc
(
int32_t
nmemb
,
int32_t
size
)
{
void
*
p
=
calloc
(
nmemb
,
size
);
if
(
p
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to calloc memory,
size:%d reason:%s"
,
size
,
strerror
(
errno
));
uError
(
"failed to calloc memory,
nmemb:%d size:%d reason:%s"
,
nmemb
,
size
,
strerror
(
errno
));
}
return
p
;
...
...
src/os/src/detail/osFail.c
浏览文件 @
d9d28841
...
...
@@ -20,7 +20,7 @@
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize
_t
taosSendRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
)
{
int64
_t
taosSendRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
...
...
@@ -29,8 +29,8 @@ ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t
return
send
(
sockfd
,
buf
,
len
,
flags
);
}
ssize_t
taosSendToRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
int64_t
taosSendToRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
...
...
@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_
return
sendto
(
sockfd
,
buf
,
len
,
flags
,
dest_addr
,
addrlen
);
}
ssize
_t
taosReadSocketRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
)
{
int64
_t
taosReadSocketRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
...
...
@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
return
read
(
fd
,
buf
,
count
);
}
ssize
_t
taosWriteSocketRandomFail
(
int32_t
fd
,
const
void
*
buf
,
size_t
count
)
{
int64
_t
taosWriteSocketRandomFail
(
int32_t
fd
,
const
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
EINTR
;
return
-
1
;
...
...
@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
sigaction
(
SIGILL
,
&
act
,
NULL
);
}
ssize_t
taosReadFileRandomFail
(
int32_t
fd
,
void
*
buf
,
size
_t
count
,
const
char
*
file
,
uint32_t
line
)
{
int64_t
taosReadFileRandomFail
(
int32_t
fd
,
void
*
buf
,
int32
_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char *
}
}
return
taosRead
(
fd
,
buf
,
count
);
return
taosRead
Imp
(
fd
,
buf
,
count
);
}
ssize_t
taosWriteFileRandomFail
(
int32_t
fd
,
void
*
buf
,
size
_t
count
,
const
char
*
file
,
uint32_t
line
)
{
int64_t
taosWriteFileRandomFail
(
int32_t
fd
,
void
*
buf
,
int32
_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char
}
}
return
taosWrite
(
fd
,
buf
,
count
);
return
taosWrite
Imp
(
fd
,
buf
,
count
);
}
off_t
taosLSeekRandomFail
(
int32_t
fd
,
off
_t
offset
,
int32_t
whence
,
const
char
*
file
,
uint32_t
line
)
{
int64_t
taosLSeekRandomFail
(
int32_t
fd
,
int64
_t
offset
,
int32_t
whence
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char *
}
}
return
taosLSeek
(
fd
,
offset
,
whence
);
return
taosLSeek
Imp
(
fd
,
offset
,
whence
);
}
#endif //TAOS_RANDOM_FILE_FAIL
src/os/src/detail/osFile.c
浏览文件 @
d9d28841
...
...
@@ -71,7 +71,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
return
rename
(
fullPath
,
*
dstPath
);
}
int64_t
taosRead
(
int32_t
fd
,
void
*
buf
,
int64_t
count
)
{
int64_t
taosRead
Imp
(
int32_t
fd
,
void
*
buf
,
int64_t
count
)
{
int64_t
leftbytes
=
count
;
int64_t
readbytes
;
char
*
tbuf
=
(
char
*
)
buf
;
...
...
@@ -95,7 +95,7 @@ int64_t taosRead(int32_t fd, void *buf, int64_t count) {
return
count
;
}
int64_t
taosWrite
(
int32_t
fd
,
void
*
buf
,
int64_t
n
)
{
int64_t
taosWrite
Imp
(
int32_t
fd
,
void
*
buf
,
int64_t
n
)
{
int64_t
nleft
=
n
;
int64_t
nwritten
=
0
;
char
*
tbuf
=
(
char
*
)
buf
;
...
...
@@ -115,6 +115,10 @@ int64_t taosWrite(int32_t fd, void *buf, int64_t n) {
return
n
;
}
int64_t
taosLSeekImp
(
int32_t
fd
,
int64_t
offset
,
int32_t
whence
)
{
return
(
int64_t
)
tlseek
(
fd
,
(
long
)
offset
,
whence
);
}
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64_t
size
)
{
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
d9d28841
...
...
@@ -13,10 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include <regex.h>
#define TAOS_RANDOM_FILE_FAIL_TEST
#include <regex.h>
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
d9d28841
...
...
@@ -14,9 +14,7 @@
*/
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
...
...
src/util/src/tkvstore.c
浏览文件 @
d9d28841
...
...
@@ -14,9 +14,7 @@
*/
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "hash.h"
#include "taoserror.h"
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
d9d28841
...
...
@@ -255,8 +255,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
fversion
=
pVnode
->
version
;
pVnode
->
wqueue
=
dnodeAlloc
ateVnodeWq
ueue
(
pVnode
);
pVnode
->
rqueue
=
dnodeAlloc
ateVnodeRq
ueue
(
pVnode
);
pVnode
->
wqueue
=
dnodeAlloc
VWriteQ
ueue
(
pVnode
);
pVnode
->
rqueue
=
dnodeAlloc
VReadQ
ueue
(
pVnode
);
if
(
pVnode
->
wqueue
==
NULL
||
pVnode
->
rqueue
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
return
terrno
;
...
...
@@ -322,7 +322,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo
.
getWalInfo
=
vnodeGetWalInfo
;
syncInfo
.
getFileInfo
=
vnodeGetFileInfo
;
syncInfo
.
writeToCache
=
vnodeWriteToQueue
;
syncInfo
.
confirmForward
=
dnodeSendRpcV
node
WriteRsp
;
syncInfo
.
confirmForward
=
dnodeSendRpcVWriteRsp
;
syncInfo
.
notifyRole
=
vnodeNotifyRole
;
syncInfo
.
notifyFlowCtrl
=
vnodeCtrlFlow
;
syncInfo
.
notifyFileSynced
=
vnodeNotifyFileSynced
;
...
...
@@ -409,12 +409,12 @@ void vnodeRelease(void *pVnodeRaw) {
}
if
(
pVnode
->
wqueue
)
{
dnodeFreeV
nodeWq
ueue
(
pVnode
->
wqueue
);
dnodeFreeV
WriteQ
ueue
(
pVnode
->
wqueue
);
pVnode
->
wqueue
=
NULL
;
}
if
(
pVnode
->
rqueue
)
{
dnodeFreeV
nodeRq
ueue
(
pVnode
->
rqueue
);
dnodeFreeV
ReadQ
ueue
(
pVnode
->
rqueue
);
pVnode
->
rqueue
=
NULL
;
}
...
...
src/wal/src/walMgmt.c
浏览文件 @
d9d28841
...
...
@@ -55,7 +55,7 @@ void walCleanUp() {
}
void
*
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
tcalloc
(
sizeof
(
SWal
));
SWal
*
pWal
=
tcalloc
(
1
,
sizeof
(
SWal
));
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
...
...
src/wal/src/walWrite.c
浏览文件 @
d9d28841
...
...
@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录