Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
50cdd465
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
50cdd465
编写于
11月 05, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/wal' into feature/table
上级
a8bdb87c
16132ff9
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
101 addition
and
145 deletion
+101
-145
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+1
-1
src/cq/test/cqtest.c
src/cq/test/cqtest.c
+1
-1
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+37
-74
src/inc/tcq.h
src/inc/tcq.h
+1
-1
src/inc/tsync.h
src/inc/tsync.h
+2
-2
src/inc/twal.h
src/inc/twal.h
+2
-2
src/inc/vnode.h
src/inc/vnode.h
+17
-5
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+7
-7
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+1
-1
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+2
-2
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+0
-2
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+2
-16
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+25
-28
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+2
-2
src/wal/test/waltest.c
src/wal/test/waltest.c
+1
-1
未找到文件。
src/cq/src/cqMain.c
浏览文件 @
50cdd465
...
...
@@ -334,7 +334,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pHead
->
version
=
0
;
// write into vnode write queue
pContext
->
cqWrite
(
pContext
->
ahandle
,
pHead
,
TAOS_QTYPE_CQ
);
pContext
->
cqWrite
(
pContext
->
ahandle
,
pHead
,
TAOS_QTYPE_CQ
,
NULL
);
free
(
buffer
);
}
src/cq/test/cqtest.c
浏览文件 @
50cdd465
...
...
@@ -24,7 +24,7 @@
int64_t
ver
=
0
;
void
*
pCq
=
NULL
;
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
)
{
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
return
0
;
}
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
50cdd465
...
...
@@ -31,14 +31,6 @@ typedef struct {
pthread_t
thread
;
// thread
}
SWriteWorker
;
typedef
struct
{
SRspRet
rspRet
;
SRpcMsg
rpcMsg
;
int32_t
processedCount
;
int32_t
code
;
int32_t
contLen
;
void
*
pCont
;
}
SWriteMsg
;
typedef
struct
{
int32_t
max
;
// max number of workers
...
...
@@ -86,39 +78,38 @@ void dnodeCleanupVWrite() {
dInfo
(
"dnode vwrite is closed"
);
}
void
dnodeDispatchToVWriteQueue
(
SRpcMsg
*
pMsg
)
{
char
*
pCont
=
pMsg
->
pCont
;
void
dnodeDispatchToVWriteQueue
(
SRpcMsg
*
pRpcMsg
)
{
int32_t
code
;
char
*
pCont
=
pRpcMsg
->
pCont
;
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
)
{
if
(
p
Rpc
Msg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
)
{
SMsgDesc
*
pDesc
=
(
SMsgDesc
*
)
pCont
;
pDesc
->
numOfVnodes
=
htonl
(
pDesc
->
numOfVnodes
);
pCont
+=
sizeof
(
SMsgDesc
);
}
SMsgHead
*
p
Head
=
(
SMsgHead
*
)
pCont
;
p
Head
->
vgId
=
htonl
(
pHead
->
vgId
);
p
Head
->
contLen
=
htonl
(
pHead
->
contLen
);
SMsgHead
*
p
Msg
=
(
SMsgHead
*
)
pCont
;
p
Msg
->
vgId
=
htonl
(
pMsg
->
vgId
);
p
Msg
->
contLen
=
htonl
(
pMsg
->
contLen
);
taos_queue
queue
=
vnodeAcquireWqueue
(
pHead
->
vgId
);
if
(
queue
)
{
// put message into queue
SWriteMsg
*
pWrite
=
taosAllocateQitem
(
sizeof
(
SWriteMsg
));
pWrite
->
rpcMsg
=
*
pMsg
;
pWrite
->
pCont
=
pCont
;
pWrite
->
contLen
=
pHead
->
contLen
;
taosWriteQitem
(
queue
,
TAOS_QTYPE_RPC
,
pWrite
);
void
*
pVnode
=
vnodeAcquire
(
pMsg
->
vgId
);
if
(
pVnode
==
NULL
)
{
code
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
else
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
TSDB_CODE_VND_INVALID_VGROUP_ID
,
.
msgType
=
0
};
SWalHead
*
pHead
=
(
SWalHead
*
)(
pCont
-
sizeof
(
SWalHead
));
pHead
->
msgType
=
pRpcMsg
->
msgType
;
pHead
->
version
=
0
;
pHead
->
len
=
pMsg
->
contLen
;
code
=
vnodeWriteToQueue
(
pVnode
,
pHead
,
TAOS_QTYPE_RPC
,
pRpcMsg
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
code
=
code
};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
vnodeRelease
(
pVnode
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
}
void
*
dnodeAllocVWriteQueue
(
void
*
pVnode
)
{
...
...
@@ -179,7 +170,7 @@ void dnodeFreeVWriteQueue(void *wqueue) {
void
dnodeSendRpcVWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
)
{
if
(
param
==
NULL
)
return
;
SWriteMsg
*
pWrite
=
param
;
S
V
WriteMsg
*
pWrite
=
param
;
if
(
code
<
0
)
pWrite
->
code
=
code
;
int32_t
count
=
atomic_add_fetch_32
(
&
pWrite
->
processedCount
,
1
);
...
...
@@ -187,26 +178,22 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if
(
count
<=
1
)
return
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pWrite
->
rpc
Msg
.
h
andle
,
.
handle
=
pWrite
->
rpc
H
andle
,
.
pCont
=
pWrite
->
rspRet
.
rsp
,
.
contLen
=
pWrite
->
rspRet
.
len
,
.
code
=
pWrite
->
code
,
};
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
pWrite
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
}
static
void
*
dnodeProcessWriteQueue
(
void
*
param
)
{
SWriteWorker
*
pWorker
=
(
SWriteWorker
*
)
param
;
SWriteMsg
*
pWrite
;
SWalHead
*
pHead
;
SRspRet
*
pRspRet
;
SWriteWorker
*
pWorker
=
param
;
SVWriteMsg
*
pWrite
;
void
*
pVnode
;
void
*
pItem
;
int32_t
numOfMsgs
;
int32_t
qtype
;
...
...
@@ -220,36 +207,14 @@ static void *dnodeProcessWriteQueue(void *param) {
}
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
pWrite
=
NULL
;
pRspRet
=
NULL
;
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
&
pItem
);
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
pWrite
=
pItem
;
pRspRet
=
&
pWrite
->
rspRet
;
pHead
=
(
SWalHead
*
)((
char
*
)
pWrite
->
pCont
-
sizeof
(
SWalHead
));
pHead
->
msgType
=
pWrite
->
rpcMsg
.
msgType
;
pHead
->
version
=
0
;
pHead
->
len
=
pWrite
->
contLen
;
dDebug
(
"%p, rpc msg:%s will be processed in vwrite queue"
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
}
else
if
(
qtype
==
TAOS_QTYPE_CQ
)
{
pHead
=
(
SWalHead
*
)((
char
*
)
pItem
+
sizeof
(
SSyncHead
));
dTrace
(
"%p, CQ wal msg:%s will be processed in vwrite queue, version:%"
PRIu64
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
);
}
else
{
pHead
=
pItem
;
dTrace
(
"%p, wal msg:%s will be processed in vwrite queue, version:%"
PRIu64
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
);
}
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
(
void
**
)
&
pWrite
);
dTrace
(
"%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%"
PRIu64
,
pWrite
->
rpcAhandle
,
pWrite
,
taosMsg
[
pWrite
->
pHead
->
msgType
],
qtype
,
pWrite
->
pHead
->
version
);
int32_t
code
=
vnodeProcessWrite
(
pVnode
,
qtype
,
pHead
,
pRspRet
);
dTrace
(
"%p, msg:%s is processed in vwrite queue, version:%"
PRIu64
", result:%s"
,
pHead
,
taosMsg
[
pHead
->
msgType
],
pHead
->
version
,
tstrerror
(
code
));
pWrite
->
code
=
vnodeProcessWrite
(
pVnode
,
qtype
,
pWrite
);
if
(
pWrite
->
code
<=
0
)
pWrite
->
processedCount
=
1
;
if
(
pWrite
)
{
pWrite
->
rpcMsg
.
code
=
code
;
if
(
code
<=
0
)
pWrite
->
processedCount
=
1
;
}
dTrace
(
"msg:%p is processed in vwrite queue, result:%s"
,
pWrite
,
tstrerror
(
pWrite
->
code
));
}
walFsync
(
vnodeGetWal
(
pVnode
));
...
...
@@ -257,17 +222,15 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems
(
pWorker
->
qall
);
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
&
pItem
);
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
(
void
**
)
&
pWrite
);
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
pWrite
=
pItem
;
dnodeSendRpcVWriteRsp
(
pVnode
,
pItem
,
pWrite
->
rpcMsg
.
code
);
dnodeSendRpcVWriteRsp
(
pVnode
,
pWrite
,
pWrite
->
code
);
}
else
if
(
qtype
==
TAOS_QTYPE_FWD
)
{
pHead
=
pItem
;
vnodeConfirmForward
(
pVnode
,
pHead
->
version
,
0
);
taosFreeQitem
(
pItem
);
vnodeConfirmForward
(
pVnode
,
pWrite
->
pHead
->
version
,
0
);
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
}
else
{
taosFreeQitem
(
p
Item
);
taosFreeQitem
(
p
Write
);
vnodeRelease
(
pVnode
);
}
}
...
...
src/inc/tcq.h
浏览文件 @
50cdd465
...
...
@@ -21,7 +21,7 @@ extern "C" {
#include "tdataformat.h"
typedef
int
(
*
FCqWrite
)(
void
*
ahandle
,
void
*
pHead
,
int
type
);
typedef
int
32_t
(
*
FCqWrite
)(
void
*
ahandle
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
typedef
struct
{
int
vgId
;
...
...
src/inc/tsync.h
浏览文件 @
50cdd465
...
...
@@ -70,8 +70,8 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef
int32_t
(
*
FGetWalInfo
)(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
);
// when a forward pkt is received, call this to handle data
typedef
int
(
*
FWriteToCache
)(
void
*
ahandle
,
void
*
pHead
,
int
type
);
// when a forward pkt is received, call this to handle data
typedef
int
32_t
(
*
FWriteToCache
)(
void
*
ahandle
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
// when forward is confirmed by peer, master call this API to notify app
typedef
void
(
*
FConfirmForward
)(
void
*
ahandle
,
void
*
mhandle
,
int32_t
code
);
...
...
src/inc/twal.h
浏览文件 @
50cdd465
...
...
@@ -43,8 +43,8 @@ typedef struct {
int8_t
keep
;
// keep the wal file when closed
}
SWalCfg
;
typedef
void
*
twalh
;
// WAL HANDLE
typedef
int
(
*
FWalWrite
)(
void
*
ahandle
,
void
*
pHead
,
int
type
);
typedef
void
*
twalh
;
// WAL HANDLE
typedef
int
32_t
FWalWrite
(
void
*
ahandle
,
void
*
pHead
,
int32_t
qtype
,
void
*
pMsg
);
int32_t
walInit
();
void
walCleanUp
();
...
...
src/inc/vnode.h
浏览文件 @
50cdd465
...
...
@@ -20,6 +20,8 @@
extern
"C"
{
#endif
#include "twal.h"
typedef
enum
_VN_STATUS
{
TAOS_VN_STATUS_INIT
,
TAOS_VN_STATUS_READY
,
...
...
@@ -29,9 +31,9 @@ typedef enum _VN_STATUS {
}
EVnStatus
;
typedef
struct
{
int
len
;
void
*
rsp
;
void
*
qhandle
;
//
used by query and retrieve msg
int
32_t
len
;
void
*
rsp
;
void
*
qhandle
;
//
used by query and retrieve msg
}
SRspRet
;
typedef
struct
{
...
...
@@ -41,6 +43,16 @@ typedef struct {
SRpcMsg
rpcMsg
;
}
SReadMsg
;
typedef
struct
{
int32_t
code
;
int32_t
processedCount
;
void
*
rpcHandle
;
void
*
rpcAhandle
;
SRspRet
rspRet
;
char
reserveForSync
[
16
];
SWalHead
pHead
[];
}
SVWriteMsg
;
extern
char
*
vnodeStatus
[];
int32_t
vnodeCreate
(
SCreateVnodeMsg
*
pVnodeCfg
);
...
...
@@ -51,11 +63,11 @@ int32_t vnodeClose(int32_t vgId);
void
*
vnodeAcquire
(
int32_t
vgId
);
// add refcount
void
*
vnodeAcquireRqueue
(
int32_t
vgId
);
// add refCount, get read queue
void
*
vnodeAcquireWqueue
(
int32_t
vgId
);
// add recCount, get write queue
void
vnodeRelease
(
void
*
pVnode
);
// dec refCount
void
*
vnodeGetWal
(
void
*
pVnode
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
void
*
pHead
,
void
*
item
);
int32_t
vnodeWriteToQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
pMsg
);
int32_t
vnodeProcessWrite
(
void
*
param
,
int32_t
qtype
,
SVWriteMsg
*
pWrite
);
int32_t
vnodeCheckWrite
(
void
*
pVnode
);
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
);
void
vnodeBuildStatusMsg
(
void
*
param
);
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
50cdd465
...
...
@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall;
static
taos_queue
tsSdbWriteQueue
;
static
SSdbWriteWorkerPool
tsSdbPool
;
static
int
sdbWrite
(
void
*
param
,
void
*
data
,
int
type
);
static
int
sdbWriteToQueue
(
void
*
param
,
void
*
data
,
int
type
);
static
int
32_t
sdbWrite
(
void
*
param
,
void
*
data
,
int32_t
type
,
void
*
pMsg
);
static
int
32_t
sdbWriteToQueue
(
void
*
param
,
void
*
data
,
int32_t
type
,
void
*
pMsg
);
static
void
*
sdbWorkerFp
(
void
*
param
);
static
int32_t
sdbInitWriteWorker
();
static
void
sdbCleanupWriteWorker
();
...
...
@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
return
TSDB_CODE_SUCCESS
;
}
static
int
sdbWrite
(
void
*
param
,
void
*
data
,
int
type
)
{
static
int
sdbWrite
(
void
*
param
,
void
*
data
,
int
32_t
type
,
void
*
pMsg
)
{
SSdbOper
*
pOper
=
param
;
SWalHead
*
pHead
=
data
;
int32_t
tableId
=
pHead
->
msgType
/
10
;
...
...
@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() {
tsSdbWriteQueue
=
NULL
;
}
int
sdbWriteToQueue
(
void
*
param
,
void
*
data
,
int
type
)
{
int
32_t
sdbWriteToQueue
(
void
*
param
,
void
*
data
,
int32_t
qtype
,
void
*
pMsg
)
{
SWalHead
*
pHead
=
data
;
int
size
=
sizeof
(
SWalHead
)
+
pHead
->
len
;
int
32_t
size
=
sizeof
(
SWalHead
)
+
pHead
->
len
;
SWalHead
*
pWal
=
(
SWalHead
*
)
taosAllocateQitem
(
size
);
memcpy
(
pWal
,
pHead
,
size
);
taosWriteQitem
(
tsSdbWriteQueue
,
type
,
pWal
);
taosWriteQitem
(
tsSdbWriteQueue
,
q
type
,
pWal
);
return
0
;
}
...
...
@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) {
pOper
=
NULL
;
}
int32_t
code
=
sdbWrite
(
pOper
,
pHead
,
type
);
int32_t
code
=
sdbWrite
(
pOper
,
pHead
,
type
,
NULL
);
if
(
code
>
0
)
code
=
0
;
if
(
pOper
)
{
pOper
->
retCode
=
code
;
...
...
src/sync/src/syncMain.c
浏览文件 @
50cdd465
...
...
@@ -863,7 +863,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
)
{
// nodeVersion = pHead->version;
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
);
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
}
else
{
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
syncSaveIntoBuffer
(
pPeer
,
pHead
);
...
...
src/sync/src/syncRestore.c
浏览文件 @
50cdd465
...
...
@@ -154,7 +154,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
if
(
ret
<
0
)
break
;
sDebug
(
"%s, restore a record, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_WAL
);
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
}
if
(
code
<
0
)
{
...
...
@@ -169,7 +169,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SWalHead
*
pHead
=
(
SWalHead
*
)
offset
;
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
);
(
*
pNode
->
writeToCache
)(
pNode
->
ahandle
,
pHead
,
TAOS_QTYPE_FWD
,
NULL
);
offset
+=
pHead
->
len
+
sizeof
(
SWalHead
);
return
offset
;
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
50cdd465
...
...
@@ -61,8 +61,6 @@ typedef struct {
char
db
[
TSDB_DB_NAME_LEN
];
}
SVnodeObj
;
int
vnodeWriteToQueue
(
void
*
param
,
void
*
pHead
,
int
type
);
int
vnodeWriteCqMsgToQueue
(
void
*
param
,
void
*
pHead
,
int
type
);
void
vnodeInitWriteFp
(
void
);
void
vnodeInitReadFp
(
void
);
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
50cdd465
...
...
@@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy
(
cqCfg
.
pass
,
tsInternalPass
);
strcpy
(
cqCfg
.
db
,
pVnode
->
db
);
cqCfg
.
vgId
=
vnode
;
cqCfg
.
cqWrite
=
vnodeWrite
CqMsg
ToQueue
;
cqCfg
.
cqWrite
=
vnodeWriteToQueue
;
pVnode
->
cq
=
cqOpen
(
pVnode
,
&
cqCfg
);
if
(
pVnode
->
cq
==
NULL
)
{
vnodeCleanUp
(
pVnode
);
...
...
@@ -365,6 +365,7 @@ int32_t vnodeClose(int32_t vgId) {
}
void
vnodeRelease
(
void
*
pVnodeRaw
)
{
if
(
pVnodeRaw
==
NULL
)
return
;
SVnodeObj
*
pVnode
=
pVnodeRaw
;
int32_t
vgId
=
pVnode
->
vgId
;
...
...
@@ -482,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) {
return
pVnode
->
rqueue
;
}
void
*
vnodeAcquireWqueue
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
return
NULL
;
int32_t
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
vInfo
(
"vgId:%d, can not provide write service, status is %s"
,
vgId
,
vnodeStatus
[
pVnode
->
status
]);
vnodeRelease
(
pVnode
);
return
NULL
;
}
return
pVnode
->
wqueue
;
}
void
*
vnodeGetWal
(
void
*
pVnode
)
{
return
((
SVnodeObj
*
)
pVnode
)
->
wal
;
}
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
50cdd465
...
...
@@ -46,10 +46,10 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
vnodeProcessUpdateTagValMsg
;
}
int32_t
vnodeProcessWrite
(
void
*
param
1
,
int
qtype
,
void
*
param2
,
void
*
item
)
{
int32_t
vnodeProcessWrite
(
void
*
param
,
int32_t
qtype
,
SVWriteMsg
*
pWrite
)
{
int32_t
code
=
0
;
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param1
;
SWalHead
*
pHead
=
p
aram2
;
SVnodeObj
*
pVnode
=
param
;
SWalHead
*
pHead
=
p
Write
->
pHead
;
if
(
vnodeProcessWriteMsgFp
[
pHead
->
msgType
]
==
NULL
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, no handle"
,
pVnode
->
vgId
,
taosMsg
[
pHead
->
msgType
]);
...
...
@@ -80,7 +80,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t
syncCode
=
0
;
syncCode
=
syncForwardToPeer
(
pVnode
->
sync
,
pHead
,
item
,
qtype
);
syncCode
=
syncForwardToPeer
(
pVnode
->
sync
,
pHead
,
&
pWrite
->
rspRet
,
qtype
);
if
(
syncCode
<
0
)
return
syncCode
;
// write into WAL
...
...
@@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
pVnode
->
version
=
pHead
->
version
;
// write data locally
code
=
(
*
vnodeProcessWriteMsgFp
[
pHead
->
msgType
])(
pVnode
,
pHead
->
cont
,
item
);
code
=
(
*
vnodeProcessWriteMsgFp
[
pHead
->
msgType
])(
pVnode
,
pHead
->
cont
,
&
pWrite
->
rspRet
);
if
(
code
<
0
)
return
code
;
return
syncCode
;
...
...
@@ -204,35 +204,32 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return
TSDB_CODE_SUCCESS
;
}
int
vnodeWriteCqMsgToQueue
(
void
*
param
,
void
*
data
,
int
type
)
{
SVnodeObj
*
pVnode
=
param
;
SWalHead
*
pHead
=
data
;
int
size
=
sizeof
(
SWalHead
)
+
pHead
->
len
;
SSyncHead
*
pSync
=
(
SSyncHead
*
)
taosAllocateQitem
(
size
+
sizeof
(
SSyncHead
));
SWalHead
*
pWal
=
(
SWalHead
*
)(
pSync
+
1
);
memcpy
(
pWal
,
pHead
,
size
);
int32_t
vnodeWriteToQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
pMsg
)
{
SVnodeObj
*
pVnode
=
vparam
;
SWalHead
*
pHead
=
wparam
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vTrace
(
"CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
taosWriteQitem
(
pVnode
->
wqueue
,
type
,
pSync
);
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
int32_t
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
return
code
;
}
return
0
;
}
int32_t
size
=
sizeof
(
SVWriteMsg
)
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
SVWriteMsg
*
pWrite
=
taosAllocateQitem
(
size
);
if
(
pWrite
==
NULL
)
{
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
}
int
vnodeWriteToQueue
(
void
*
param
,
void
*
data
,
int
type
)
{
SVnodeObj
*
pVnode
=
param
;
SWalHead
*
pHead
=
data
;
if
(
pMsg
!=
NULL
)
{
SRpcMsg
*
pRpcMsg
=
pMsg
;
pWrite
->
rpcHandle
=
pRpcMsg
->
handle
;
pWrite
->
rpcAhandle
=
pRpcMsg
->
ahandle
;
}
int
size
=
sizeof
(
SWalHead
)
+
pHead
->
len
;
SWalHead
*
pWal
=
(
SWalHead
*
)
taosAllocateQitem
(
size
);
memcpy
(
pWal
,
pHead
,
size
);
memcpy
(
pWrite
->
pHead
,
pHead
,
sizeof
(
SWalHead
)
+
pHead
->
len
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vTrace
(
"vgId:%d, get vnode wqueue, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
taosWriteQitem
(
pVnode
->
wqueue
,
type
,
pWal
);
return
0
;
taosWriteQitem
(
pVnode
->
wqueue
,
qtype
,
pWrite
);
return
TSDB_CODE_SUCCESS
;
}
src/wal/src/walWrite.c
浏览文件 @
50cdd465
...
...
@@ -122,7 +122,7 @@ void walFsync(void *handle) {
}
}
int32_t
walRestore
(
void
*
handle
,
void
*
pVnode
,
int32_t
(
*
writeFp
)(
void
*
,
void
*
,
int32_t
)
)
{
int32_t
walRestore
(
void
*
handle
,
void
*
pVnode
,
FWalWrite
writeFp
)
{
if
(
handle
==
NULL
)
return
-
1
;
SWal
*
pWal
=
handle
;
...
...
@@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if
(
pWal
->
keep
)
pWal
->
version
=
pHead
->
version
;
(
*
writeFp
)(
pVnode
,
pHead
,
TAOS_QTYPE_WAL
);
(
*
writeFp
)(
pVnode
,
pHead
,
TAOS_QTYPE_WAL
,
NULL
);
}
tclose
(
fd
);
...
...
src/wal/test/waltest.c
浏览文件 @
50cdd465
...
...
@@ -23,7 +23,7 @@
int64_t
ver
=
0
;
void
*
pWal
=
NULL
;
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
)
{
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
// do nothing
SWalHead
*
pHead
=
data
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录