Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9221e6f4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9221e6f4
编写于
4月 14, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact vnode
上级
0a2b62e3
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
514 addition
and
348 deletion
+514
-348
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+2
-1
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-0
source/dnode/vnode/src/vnd/vnodeBufferPool2.c
source/dnode/vnode/src/vnd/vnodeBufferPool2.c
+149
-0
source/dnode/vnode/src/vnd/vnodeInt.c
source/dnode/vnode/src/vnd/vnodeInt.c
+1
-22
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+17
-52
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+344
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+0
-273
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
9221e6f4
...
@@ -6,6 +6,7 @@ target_sources(
...
@@ -6,6 +6,7 @@ target_sources(
# vnode
# vnode
"src/vnd/vnodeArenaMAImpl.c"
"src/vnd/vnodeArenaMAImpl.c"
"src/vnd/vnodeBufferPool.c"
"src/vnd/vnodeBufferPool.c"
# "src/vnd/vnodeBufferPool2.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c"
"src/vnd/vnodeInt.c"
...
@@ -14,7 +15,7 @@ target_sources(
...
@@ -14,7 +15,7 @@ target_sources(
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c"
"src/vnd/vnodeWrite.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeModule.c"
# "src/vnd/vnodeMg
r.c"
"src/vnd/vnodeSv
r.c"
# meta
# meta
# "src/meta/metaBDBImpl.c"
# "src/meta/metaBDBImpl.c"
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
9221e6f4
...
@@ -36,6 +36,7 @@ int vnodeScheduleTask(int (*execute)(void*), void* arg);
...
@@ -36,6 +36,7 @@ int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeQuery ====================
// vnodeQuery ====================
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
#if 1
#if 1
// SVBufPool
// SVBufPool
...
...
source/dnode/vnode/src/vnd/vnodeBufferPool2.c
0 → 100644
浏览文件 @
9221e6f4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
/* ------------------------ STRUCTURES ------------------------ */
static
int
vnodeBufPoolCreate
(
int
size
,
SVBufPool
**
ppPool
);
static
int
vnodeBufPoolDestroy
(
SVBufPool
*
pPool
);
int
vnodeOpenBufPool
(
SVnode
*
pVnode
,
int64_t
size
)
{
SVBufPool
*
pPool
=
NULL
;
int
ret
;
ASSERT
(
pVnode
->
pPool
==
NULL
);
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
// create pool
ret
=
vnodeBufPoolCreate
(
size
,
&
pPool
);
if
(
ret
<
0
)
{
vError
(
"vgId:%d failed to open vnode buffer pool since %s"
,
TD_VNODE_ID
(
pVnode
),
tstrerror
(
terrno
));
vnodeCloseBufPool
(
pVnode
);
return
-
1
;
}
// add pool to queue
pPool
->
next
=
pVnode
->
pPool
;
pVnode
->
pPool
=
pPool
;
}
vDebug
(
"vgId:%d vnode buffer pool is opened, pool size: %"
PRId64
,
TD_VNODE_ID
(
pVnode
),
size
);
return
0
;
}
int
vnodeCloseBufPool
(
SVnode
*
pVnode
)
{
SVBufPool
*
pPool
;
for
(
pPool
=
pVnode
->
pPool
;
pPool
;
pPool
=
pVnode
->
pPool
)
{
pVnode
->
pPool
=
pPool
->
next
;
vnodeBufPoolDestroy
(
pPool
);
}
vDebug
(
"vgId:%d vnode buffer pool is closed"
,
TD_VNODE_ID
(
pVnode
));
return
0
;
}
void
vnodeBufPoolReset
(
SVBufPool
*
pPool
)
{
SVBufPoolNode
*
pNode
;
for
(
pNode
=
pPool
->
pTail
;
pNode
->
prev
;
pNode
=
pPool
->
pTail
)
{
ASSERT
(
pNode
->
pnext
==
&
pPool
->
pTail
);
pNode
->
prev
->
pnext
=
&
pPool
->
pTail
;
pPool
->
pTail
=
pNode
->
prev
;
pPool
->
size
=
pPool
->
size
-
sizeof
(
*
pNode
)
-
pNode
->
size
;
taosMemoryFree
(
pNode
);
}
ASSERT
(
pPool
->
size
==
pPool
->
ptr
-
pPool
->
node
.
data
);
pPool
->
size
=
0
;
pPool
->
ptr
=
pPool
->
node
.
data
;
}
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
size_t
size
)
{
SVBufPoolNode
*
pNode
;
void
*
p
;
if
(
pPool
->
node
.
size
>=
pPool
->
ptr
-
pPool
->
node
.
data
+
size
)
{
// allocate from the anchor node
p
=
pPool
->
ptr
;
pPool
->
ptr
=
pPool
->
ptr
+
size
;
pPool
->
size
+=
size
;
}
else
{
// allocate a new node
pNode
=
taosMemoryMalloc
(
sizeof
(
*
pNode
)
+
size
);
if
(
pNode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
p
=
pNode
->
data
;
pNode
->
size
=
size
;
pNode
->
prev
=
pPool
->
pTail
;
pNode
->
pnext
=
&
pPool
->
pTail
;
pPool
->
pTail
->
pnext
=
&
pNode
->
prev
;
pPool
->
pTail
=
pNode
;
pPool
->
size
=
pPool
->
size
+
sizeof
(
*
pNode
)
+
size
;
}
return
p
;
}
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
)
{
uint8_t
*
ptr
=
(
uint8_t
*
)
p
;
SVBufPoolNode
*
pNode
;
if
(
ptr
<
pPool
->
node
.
data
||
ptr
>=
pPool
->
node
.
data
+
pPool
->
node
.
size
)
{
pNode
=
&
((
SVBufPoolNode
*
)
p
)[
-
1
];
*
pNode
->
pnext
=
pNode
->
prev
;
pNode
->
prev
->
pnext
=
pNode
->
pnext
;
pPool
->
size
=
pPool
->
size
-
sizeof
(
*
pNode
)
-
pNode
->
size
;
taosMemoryFree
(
pNode
);
}
}
// STATIC METHODS -------------------
static
int
vnodeBufPoolCreate
(
int
size
,
SVBufPool
**
ppPool
)
{
SVBufPool
*
pPool
;
pPool
=
taosMemoryMalloc
(
sizeof
(
SVBufPool
)
+
size
);
if
(
pPool
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pPool
->
next
=
NULL
;
pPool
->
nRef
=
0
;
pPool
->
size
=
0
;
pPool
->
ptr
=
pPool
->
node
.
data
;
pPool
->
pTail
=
&
pPool
->
node
;
pPool
->
node
.
prev
=
NULL
;
pPool
->
node
.
pnext
=
&
pPool
->
pTail
;
pPool
->
node
.
size
=
size
;
*
ppPool
=
pPool
;
return
0
;
}
static
int
vnodeBufPoolDestroy
(
SVBufPool
*
pPool
)
{
vnodeBufPoolReset
(
pPool
);
taosMemoryFree
(
pPool
);
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeInt.c
浏览文件 @
9221e6f4
...
@@ -21,25 +21,4 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
...
@@ -21,25 +21,4 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
int32_t
vnodeCompact
(
SVnode
*
pVnode
)
{
return
0
;
}
int32_t
vnodeCompact
(
SVnode
*
pVnode
)
{
return
0
;
}
int32_t
vnodeSync
(
SVnode
*
pVnode
)
{
return
0
;
}
int32_t
vnodeSync
(
SVnode
*
pVnode
)
{
return
0
;
}
\ No newline at end of file
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
pLoad
->
vgId
=
pVnode
->
vgId
;
pLoad
->
role
=
TAOS_SYNC_STATE_LEADER
;
pLoad
->
numOfTables
=
metaGetTbNum
(
pVnode
->
pMeta
);
pLoad
->
numOfTimeSeries
=
400
;
pLoad
->
totalStorage
=
300
;
pLoad
->
compStorage
=
200
;
pLoad
->
pointsWritten
=
100
;
pLoad
->
numOfSelectReqs
=
1
;
pLoad
->
numOfInsertReqs
=
3
;
pLoad
->
numOfInsertSuccessReqs
=
2
;
pLoad
->
numOfBatchInsertReqs
=
5
;
pLoad
->
numOfBatchInsertSuccessReqs
=
4
;
return
0
;
}
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
/*vInfo("sync message is processed");*/
return
0
;
}
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
9221e6f4
...
@@ -15,64 +15,13 @@
...
@@ -15,64 +15,13 @@
#include "vnodeInt.h"
#include "vnodeInt.h"
static
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int
vnodeQueryOpen
(
SVnode
*
pVnode
)
{
int
vnodeQueryOpen
(
SVnode
*
pVnode
)
{
return
qWorkerInit
(
NODE_TYPE_VNODE
,
pVnode
->
vgId
,
NULL
,
(
void
**
)
&
pVnode
->
pQuery
,
&
pVnode
->
msgCb
);
return
qWorkerInit
(
NODE_TYPE_VNODE
,
pVnode
->
vgId
,
NULL
,
(
void
**
)
&
pVnode
->
pQuery
,
&
pVnode
->
msgCb
);
}
}
void
vnodeQueryClose
(
SVnode
*
pVnode
)
{
qWorkerDestroy
((
void
**
)
&
pVnode
->
pQuery
);
}
void
vnodeQueryClose
(
SVnode
*
pVnode
)
{
qWorkerDestroy
((
void
**
)
&
pVnode
->
pQuery
);
}
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in query queue is processing"
);
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
int
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_FETCH_RSP
:
return
qWorkerProcessFetchRsp
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_RES_READY
:
return
qWorkerProcessReadyMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TASKS_STATUS
:
return
qWorkerProcessStatusMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_CANCEL_TASK
:
return
qWorkerProcessCancelMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_DROP_TASK
:
return
qWorkerProcessDropMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TABLE_META
:
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
,
pInfo
->
workerId
);
case
TDMT_VND_TASK_PIPE_EXEC
:
case
TDMT_VND_TASK_MERGE_EXEC
:
return
tqProcessTaskExec
(
pVnode
->
pTq
,
msgstr
,
msgLen
,
0
);
case
TDMT_VND_STREAM_TRIGGER
:
return
tqProcessStreamTrigger
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
,
0
);
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
static
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
STbCfg
*
pTbCfg
=
NULL
;
STbCfg
*
pTbCfg
=
NULL
;
STbCfg
*
pStbCfg
=
NULL
;
STbCfg
*
pStbCfg
=
NULL
;
tb_uid_t
uid
;
tb_uid_t
uid
;
...
@@ -200,3 +149,19 @@ _exit:
...
@@ -200,3 +149,19 @@ _exit:
tmsgSendRsp
(
&
rpcMsg
);
tmsgSendRsp
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
pLoad
->
vgId
=
pVnode
->
vgId
;
pLoad
->
role
=
TAOS_SYNC_STATE_LEADER
;
pLoad
->
numOfTables
=
metaGetTbNum
(
pVnode
->
pMeta
);
pLoad
->
numOfTimeSeries
=
400
;
pLoad
->
totalStorage
=
300
;
pLoad
->
compStorage
=
200
;
pLoad
->
pointsWritten
=
100
;
pLoad
->
numOfSelectReqs
=
1
;
pLoad
->
numOfInsertReqs
=
3
;
pLoad
->
numOfInsertSuccessReqs
=
2
;
pLoad
->
numOfBatchInsertReqs
=
5
;
pLoad
->
numOfBatchInsertSuccessReqs
=
4
;
return
0
;
}
source/dnode/vnode/src/vnd/vnodeSvr.c
0 → 100644
浏览文件 @
9221e6f4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
void
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
SNodeMsg
*
pMsg
;
SRpcMsg
*
pRpc
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMsgs
);
i
++
)
{
pMsg
=
*
(
SNodeMsg
**
)
taosArrayGet
(
pMsgs
,
i
);
pRpc
=
&
pMsg
->
rpcMsg
;
// set request version
void
*
pBuf
=
POINTER_SHIFT
(
pRpc
->
pCont
,
sizeof
(
SMsgHead
));
int64_t
ver
=
pVnode
->
state
.
processed
++
;
taosEncodeFixedI64
(
&
pBuf
,
ver
);
if
(
walWrite
(
pVnode
->
pWal
,
ver
,
pRpc
->
msgType
,
pRpc
->
pCont
,
pRpc
->
contLen
)
<
0
)
{
// TODO: handle error
/*ASSERT(false);*/
vError
(
"vnode:%d write wal error since %s"
,
pVnode
->
vgId
,
terrstr
());
}
}
walFsync
(
pVnode
->
pWal
,
false
);
// TODO: Integrate RAFT module here
// No results are returned because error handling is difficult
// return 0;
}
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
void
*
ptr
=
NULL
;
if
(
pVnode
->
config
.
streamMode
==
0
)
{
ptr
=
vnodeMalloc
(
pVnode
,
pMsg
->
contLen
);
if
(
ptr
==
NULL
)
{
// TODO: handle error
}
// TODO: copy here need to be extended
memcpy
(
ptr
,
pMsg
->
pCont
,
pMsg
->
contLen
);
}
// todo: change the interface here
int64_t
ver
;
taosDecodeFixedI64
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
ver
);
if
(
tqPushMsg
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
,
pMsg
->
msgType
,
ver
)
<
0
)
{
// TODO: handle error
}
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_CREATE_STB
:
{
SVCreateTbReq
vCreateTbReq
=
{
0
};
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbReq
);
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
))
<
0
)
{
// TODO: handle error
}
// TODO: to encapsule a free API
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pTagSchema
);
if
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
);
}
taosMemoryFree
(
vCreateTbReq
.
dbFName
);
taosMemoryFree
(
vCreateTbReq
.
name
);
break
;
}
case
TDMT_VND_CREATE_TABLE
:
{
SVCreateTbBatchReq
vCreateTbBatchReq
=
{
0
};
SVCreateTbBatchRsp
vCreateTbBatchRsp
=
{
0
};
tDeserializeSVCreateTbBatchReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbBatchReq
);
int
reqNum
=
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
);
for
(
int
i
=
0
;
i
<
reqNum
;
i
++
)
{
SVCreateTbReq
*
pCreateTbReq
=
taosArrayGet
(
vCreateTbBatchReq
.
pArray
,
i
);
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pMsg
->
pCont
;
sprintf
(
tableFName
,
"%s.%s"
,
pCreateTbReq
->
dbFName
,
pCreateTbReq
->
name
);
int32_t
code
=
vnodeValidateTableHash
(
&
pVnode
->
config
,
tableFName
);
if
(
code
)
{
SVCreateTbRsp
rsp
;
rsp
.
code
=
code
;
taosArrayPush
(
vCreateTbBatchRsp
.
rspList
,
&
rsp
);
}
if
(
metaCreateTable
(
pVnode
->
pMeta
,
pCreateTbReq
)
<
0
)
{
// TODO: handle error
vError
(
"vgId:%d, failed to create table: %s"
,
pVnode
->
vgId
,
pCreateTbReq
->
name
);
}
// TODO: to encapsule a free API
taosMemoryFree
(
pCreateTbReq
->
name
);
taosMemoryFree
(
pCreateTbReq
->
dbFName
);
if
(
pCreateTbReq
->
type
==
TD_SUPER_TABLE
)
{
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pSchema
);
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pTagSchema
);
if
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
);
}
}
else
if
(
pCreateTbReq
->
type
==
TD_CHILD_TABLE
)
{
taosMemoryFree
(
pCreateTbReq
->
ctbCfg
.
pTag
);
}
else
{
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pSchema
);
if
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
);
}
}
}
vTrace
(
"vgId:%d process create %"
PRIzu
" tables"
,
pVnode
->
vgId
,
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
));
taosArrayDestroy
(
vCreateTbBatchReq
.
pArray
);
if
(
vCreateTbBatchRsp
.
rspList
)
{
int32_t
contLen
=
tSerializeSVCreateTbBatchRsp
(
NULL
,
0
,
&
vCreateTbBatchRsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSVCreateTbBatchRsp
(
msg
,
contLen
,
&
vCreateTbBatchRsp
);
taosArrayDestroy
(
vCreateTbBatchRsp
.
rspList
);
*
pRsp
=
taosMemoryCalloc
(
1
,
sizeof
(
SRpcMsg
));
(
*
pRsp
)
->
msgType
=
TDMT_VND_CREATE_TABLE_RSP
;
(
*
pRsp
)
->
pCont
=
msg
;
(
*
pRsp
)
->
contLen
=
contLen
;
(
*
pRsp
)
->
handle
=
pMsg
->
handle
;
(
*
pRsp
)
->
ahandle
=
pMsg
->
ahandle
;
}
break
;
}
case
TDMT_VND_ALTER_STB
:
{
SVCreateTbReq
vAlterTbReq
=
{
0
};
vTrace
(
"vgId:%d, process alter stb req"
,
pVnode
->
vgId
);
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vAlterTbReq
);
// TODO: to encapsule a free API
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pTagSchema
);
if
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
);
}
taosMemoryFree
(
vAlterTbReq
.
dbFName
);
taosMemoryFree
(
vAlterTbReq
.
name
);
break
;
}
case
TDMT_VND_DROP_STB
:
vTrace
(
"vgId:%d, process drop stb req"
,
pVnode
->
vgId
);
break
;
case
TDMT_VND_DROP_TABLE
:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// // TODO: handle error
// }
break
;
case
TDMT_VND_SUBMIT
:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
if
(
pVnode
->
config
.
streamMode
==
0
)
{
if
(
tsdbInsertData
(
pVnode
->
pTsdb
,
(
SSubmitReq
*
)
ptr
,
NULL
)
<
0
)
{
// TODO: handle error
}
}
break
;
case
TDMT_VND_MQ_SET_CONN
:
{
if
(
tqProcessSetConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO: handle error
}
}
break
;
case
TDMT_VND_MQ_REB
:
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_MQ_CANCEL_CONN
:
{
if
(
tqProcessCancelConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_WRITE_EXEC
:
{
if
(
tqProcessTaskExec
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
),
0
)
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
#if 0
SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId,
terrstr(terrno));
return -1;
}
vDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, pVnode->config.vgId,
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid);
// record current timezone of server side
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
// TODO: handle error
tdDestroyTSma(&vCreateSmaReq.tSma);
return -1;
}
tsdbTSmaAdd(pVnode->pTsdb, 1);
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
#if 0
tsdbTSmaSub(pVnode->pTsdb, 1);
SVDropTSmaReq vDropSmaReq = {0};
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq);
// return -1;
// }
//
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
// TODO: return directly or go on follow steps?
#endif
}
break
;
default:
ASSERT
(
0
);
break
;
}
pVnode
->
state
.
applied
=
ver
;
// Check if it needs to commit
if
(
vnodeShouldCommit
(
pVnode
))
{
// tsem_wait(&(pVnode->canCommit));
if
(
vnodeAsyncCommit
(
pVnode
)
<
0
)
{
// TODO: handle error
}
}
return
0
;
}
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in query queue is processing"
);
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
int
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_FETCH_RSP
:
return
qWorkerProcessFetchRsp
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_RES_READY
:
return
qWorkerProcessReadyMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TASKS_STATUS
:
return
qWorkerProcessStatusMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_CANCEL_TASK
:
return
qWorkerProcessCancelMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_DROP_TASK
:
return
qWorkerProcessDropMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TABLE_META
:
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
,
pInfo
->
workerId
);
case
TDMT_VND_TASK_PIPE_EXEC
:
case
TDMT_VND_TASK_MERGE_EXEC
:
return
tqProcessTaskExec
(
pVnode
->
pTq
,
msgstr
,
msgLen
,
0
);
case
TDMT_VND_STREAM_TRIGGER
:
return
tqProcessStreamTrigger
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
,
0
);
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
// TODO: remove the function
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
// blockDebugShowData(data);
tsdbInsertTSmaData
(((
SVnode
*
)
pVnode
)
->
pTsdb
,
smaId
,
(
const
char
*
)
data
);
}
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
/*vInfo("sync message is processed");*/
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
9221e6f4
...
@@ -15,277 +15,4 @@
...
@@ -15,277 +15,4 @@
#include "vnodeInt.h"
#include "vnodeInt.h"
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
// blockDebugShowData(data);
tsdbInsertTSmaData
(((
SVnode
*
)
pVnode
)
->
pTsdb
,
smaId
,
(
const
char
*
)
data
);
}
void
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
SNodeMsg
*
pMsg
;
SRpcMsg
*
pRpc
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMsgs
);
i
++
)
{
pMsg
=
*
(
SNodeMsg
**
)
taosArrayGet
(
pMsgs
,
i
);
pRpc
=
&
pMsg
->
rpcMsg
;
// set request version
void
*
pBuf
=
POINTER_SHIFT
(
pRpc
->
pCont
,
sizeof
(
SMsgHead
));
int64_t
ver
=
pVnode
->
state
.
processed
++
;
taosEncodeFixedI64
(
&
pBuf
,
ver
);
if
(
walWrite
(
pVnode
->
pWal
,
ver
,
pRpc
->
msgType
,
pRpc
->
pCont
,
pRpc
->
contLen
)
<
0
)
{
// TODO: handle error
/*ASSERT(false);*/
vError
(
"vnode:%d write wal error since %s"
,
pVnode
->
vgId
,
terrstr
());
}
}
walFsync
(
pVnode
->
pWal
,
false
);
// TODO: Integrate RAFT module here
// No results are returned because error handling is difficult
// return 0;
}
int
vnodeApplyWMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
void
*
ptr
=
NULL
;
if
(
pVnode
->
config
.
streamMode
==
0
)
{
ptr
=
vnodeMalloc
(
pVnode
,
pMsg
->
contLen
);
if
(
ptr
==
NULL
)
{
// TODO: handle error
}
// TODO: copy here need to be extended
memcpy
(
ptr
,
pMsg
->
pCont
,
pMsg
->
contLen
);
}
// todo: change the interface here
int64_t
ver
;
taosDecodeFixedI64
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
ver
);
if
(
tqPushMsg
(
pVnode
->
pTq
,
pMsg
->
pCont
,
pMsg
->
contLen
,
pMsg
->
msgType
,
ver
)
<
0
)
{
// TODO: handle error
}
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_CREATE_STB
:
{
SVCreateTbReq
vCreateTbReq
=
{
0
};
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbReq
);
if
(
metaCreateTable
(
pVnode
->
pMeta
,
&
(
vCreateTbReq
))
<
0
)
{
// TODO: handle error
}
// TODO: to encapsule a free API
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pTagSchema
);
if
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
);
}
taosMemoryFree
(
vCreateTbReq
.
dbFName
);
taosMemoryFree
(
vCreateTbReq
.
name
);
break
;
}
case
TDMT_VND_CREATE_TABLE
:
{
SVCreateTbBatchReq
vCreateTbBatchReq
=
{
0
};
SVCreateTbBatchRsp
vCreateTbBatchRsp
=
{
0
};
tDeserializeSVCreateTbBatchReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vCreateTbBatchReq
);
int
reqNum
=
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
);
for
(
int
i
=
0
;
i
<
reqNum
;
i
++
)
{
SVCreateTbReq
*
pCreateTbReq
=
taosArrayGet
(
vCreateTbBatchReq
.
pArray
,
i
);
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pMsg
->
pCont
;
sprintf
(
tableFName
,
"%s.%s"
,
pCreateTbReq
->
dbFName
,
pCreateTbReq
->
name
);
int32_t
code
=
vnodeValidateTableHash
(
&
pVnode
->
config
,
tableFName
);
if
(
code
)
{
SVCreateTbRsp
rsp
;
rsp
.
code
=
code
;
taosArrayPush
(
vCreateTbBatchRsp
.
rspList
,
&
rsp
);
}
if
(
metaCreateTable
(
pVnode
->
pMeta
,
pCreateTbReq
)
<
0
)
{
// TODO: handle error
vError
(
"vgId:%d, failed to create table: %s"
,
pVnode
->
vgId
,
pCreateTbReq
->
name
);
}
// TODO: to encapsule a free API
taosMemoryFree
(
pCreateTbReq
->
name
);
taosMemoryFree
(
pCreateTbReq
->
dbFName
);
if
(
pCreateTbReq
->
type
==
TD_SUPER_TABLE
)
{
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pSchema
);
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pTagSchema
);
if
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
pCreateTbReq
->
stbCfg
.
pRSmaParam
);
}
}
else
if
(
pCreateTbReq
->
type
==
TD_CHILD_TABLE
)
{
taosMemoryFree
(
pCreateTbReq
->
ctbCfg
.
pTag
);
}
else
{
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pSchema
);
if
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
pCreateTbReq
->
ntbCfg
.
pRSmaParam
);
}
}
}
vTrace
(
"vgId:%d process create %"
PRIzu
" tables"
,
pVnode
->
vgId
,
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
));
taosArrayDestroy
(
vCreateTbBatchReq
.
pArray
);
if
(
vCreateTbBatchRsp
.
rspList
)
{
int32_t
contLen
=
tSerializeSVCreateTbBatchRsp
(
NULL
,
0
,
&
vCreateTbBatchRsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSVCreateTbBatchRsp
(
msg
,
contLen
,
&
vCreateTbBatchRsp
);
taosArrayDestroy
(
vCreateTbBatchRsp
.
rspList
);
*
pRsp
=
taosMemoryCalloc
(
1
,
sizeof
(
SRpcMsg
));
(
*
pRsp
)
->
msgType
=
TDMT_VND_CREATE_TABLE_RSP
;
(
*
pRsp
)
->
pCont
=
msg
;
(
*
pRsp
)
->
contLen
=
contLen
;
(
*
pRsp
)
->
handle
=
pMsg
->
handle
;
(
*
pRsp
)
->
ahandle
=
pMsg
->
ahandle
;
}
break
;
}
case
TDMT_VND_ALTER_STB
:
{
SVCreateTbReq
vAlterTbReq
=
{
0
};
vTrace
(
"vgId:%d, process alter stb req"
,
pVnode
->
vgId
);
tDeserializeSVCreateTbReq
(
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
&
vAlterTbReq
);
// TODO: to encapsule a free API
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pTagSchema
);
if
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
vAlterTbReq
.
stbCfg
.
pRSmaParam
);
}
taosMemoryFree
(
vAlterTbReq
.
dbFName
);
taosMemoryFree
(
vAlterTbReq
.
name
);
break
;
}
case
TDMT_VND_DROP_STB
:
vTrace
(
"vgId:%d, process drop stb req"
,
pVnode
->
vgId
);
break
;
case
TDMT_VND_DROP_TABLE
:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// // TODO: handle error
// }
break
;
case
TDMT_VND_SUBMIT
:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
if
(
pVnode
->
config
.
streamMode
==
0
)
{
if
(
tsdbInsertData
(
pVnode
->
pTsdb
,
(
SSubmitReq
*
)
ptr
,
NULL
)
<
0
)
{
// TODO: handle error
}
}
break
;
case
TDMT_VND_MQ_SET_CONN
:
{
if
(
tqProcessSetConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO: handle error
}
}
break
;
case
TDMT_VND_MQ_REB
:
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_MQ_CANCEL_CONN
:
{
if
(
tqProcessCancelConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_TASK_WRITE_EXEC
:
{
if
(
tqProcessTaskExec
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
),
0
)
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
#if 0
SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId,
terrstr(terrno));
return -1;
}
vDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, pVnode->config.vgId,
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid);
// record current timezone of server side
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
// TODO: handle error
tdDestroyTSma(&vCreateSmaReq.tSma);
return -1;
}
tsdbTSmaAdd(pVnode->pTsdb, 1);
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
#if 0
tsdbTSmaSub(pVnode->pTsdb, 1);
SVDropTSmaReq vDropSmaReq = {0};
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq);
// return -1;
// }
//
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
// TODO: return directly or go on follow steps?
#endif
}
break
;
default:
ASSERT
(
0
);
break
;
}
pVnode
->
state
.
applied
=
ver
;
// Check if it needs to commit
if
(
vnodeShouldCommit
(
pVnode
))
{
// tsem_wait(&(pVnode->canCommit));
if
(
vnodeAsyncCommit
(
pVnode
)
<
0
)
{
// TODO: handle error
}
}
return
0
;
}
/* ------------------------ STATIC METHODS ------------------------ */
/* ------------------------ STATIC METHODS ------------------------ */
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录