Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
930dca2a
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
930dca2a
编写于
10月 28, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
10月 28, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #8467 from taosdata/feature/dnode3
Feature/dnode3
上级
caa4a1ac
18697d84
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
93 addition
and
97 deletion
+93
-97
include/server/dnode/dnode.h
include/server/dnode/dnode.h
+5
-0
include/server/vnode/vnode.h
include/server/vnode/vnode.h
+5
-0
include/util/taoserror.h
include/util/taoserror.h
+4
-4
source/server/dnode/src/dnodeInt.c
source/server/dnode/src/dnodeInt.c
+2
-1
source/server/vnode/inc/vnodeInt.h
source/server/vnode/inc/vnodeInt.h
+1
-2
source/server/vnode/src/vnodeFile.c
source/server/vnode/src/vnodeFile.c
+35
-48
source/server/vnode/src/vnodeInt.c
source/server/vnode/src/vnodeInt.c
+37
-0
source/server/vnode/src/vnodeMain.c
source/server/vnode/src/vnodeMain.c
+0
-36
source/server/vnode/src/vnodeWrite.c
source/server/vnode/src/vnodeWrite.c
+0
-2
source/util/src/terror.c
source/util/src/terror.c
+4
-4
未找到文件。
include/server/dnode/dnode.h
浏览文件 @
930dca2a
...
@@ -67,6 +67,11 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
...
@@ -67,6 +67,11 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
*/
*/
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
void
dnodeGetEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
/**
* Report the startup progress.
*/
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/server/vnode/vnode.h
浏览文件 @
930dca2a
...
@@ -46,6 +46,11 @@ typedef struct {
...
@@ -46,6 +46,11 @@ typedef struct {
*/
*/
void
(
*
GetDnodeEp
)(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
void
(
*
GetDnodeEp
)(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
/**
* Report the startup progress.
*/
void
(
*
ReportStartup
)(
char
*
name
,
char
*
desc
);
}
SVnodeFp
;
}
SVnodeFp
;
typedef
struct
{
typedef
struct
{
...
...
include/util/taoserror.h
浏览文件 @
930dca2a
...
@@ -233,11 +233,11 @@ int32_t* taosGetErrno();
...
@@ -233,11 +233,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file")
#define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file")
#define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory")
#define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory")
#define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode")
#define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode")
#define TSDB_CODE_VND_INVALID_
VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file"
)
#define TSDB_CODE_VND_INVALID_
CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file
)
#define TSDB_CODE_VND_I
S_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed
")
#define TSDB_CODE_VND_I
NVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file
")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full
for waiting commit
")
#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full")
#define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping")
#define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping")
#define TSDB_CODE_VND_IS_
BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balanc
ing")
#define TSDB_CODE_VND_IS_
UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updat
ing")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing")
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
...
...
source/server/dnode/src/dnodeInt.c
浏览文件 @
930dca2a
...
@@ -37,7 +37,7 @@ EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
...
@@ -37,7 +37,7 @@ EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
void
dnodeSetRunStat
(
EDnStat
stat
)
{
tsDnode
.
runStatus
=
stat
;
}
void
dnodeSetRunStat
(
EDnStat
stat
)
{
tsDnode
.
runStatus
=
stat
;
}
static
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
)
{
void
dnodeReportStartup
(
char
*
name
,
char
*
desc
)
{
SStartupStep
*
startup
=
&
tsDnode
.
startup
;
SStartupStep
*
startup
=
&
tsDnode
.
startup
;
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
name
,
name
,
strlen
(
startup
->
name
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
tstrncpy
(
startup
->
desc
,
desc
,
strlen
(
startup
->
desc
));
...
@@ -58,6 +58,7 @@ static int32_t dnodeInitVnode() {
...
@@ -58,6 +58,7 @@ static int32_t dnodeInitVnode() {
para
.
fp
.
GetDnodeEp
=
dnodeGetEp
;
para
.
fp
.
GetDnodeEp
=
dnodeGetEp
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToDnode
=
dnodeSendMsgToDnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
SendMsgToMnode
=
dnodeSendMsgToMnode
;
para
.
fp
.
ReportStartup
=
dnodeReportStartup
;
return
vnodeInit
(
para
);
return
vnodeInit
(
para
);
}
}
...
...
source/server/vnode/inc/vnodeInt.h
浏览文件 @
930dca2a
...
@@ -77,8 +77,6 @@ typedef struct SVnodeCfg {
...
@@ -77,8 +77,6 @@ typedef struct SVnodeCfg {
SSyncCfg
sync
;
SSyncCfg
sync
;
}
SVnodeCfg
;
}
SVnodeCfg
;
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
// global vnode group ID
int32_t
vgId
;
// global vnode group ID
int32_t
refCount
;
// reference count
int32_t
refCount
;
// reference count
...
@@ -114,6 +112,7 @@ typedef struct {
...
@@ -114,6 +112,7 @@ typedef struct {
void
vnodeSendMsgToDnode
(
struct
SRpcEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
void
vnodeSendMsgToDnode
(
struct
SRpcEpSet
*
epSet
,
struct
SRpcMsg
*
rpcMsg
);
void
vnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
);
void
vnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
);
void
vnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
void
vnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
void
vnodeReportStartup
(
char
*
name
,
char
*
desc
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/server/vnode/src/vnodeFile.c
浏览文件 @
930dca2a
...
@@ -280,90 +280,77 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
...
@@ -280,90 +280,77 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
vnodeReadTerm
(
int32_t
vgId
,
SSyncServerState
*
pState
){
int32_t
vnodeReadTerm
(
int32_t
vgId
,
SSyncServerState
*
pState
)
{
#if 0
int32_t
ret
=
TSDB_CODE_VND_APP_ERROR
;
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
maxLen
=
100
;
int32_t
maxLen
=
100
;
char * content = calloc(1, maxLen + 1);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON * root = NULL;
cJSON
*
root
=
NULL
;
FILE * fp = NULL;
FILE
*
fp
=
NULL
;
terrno = TSDB_CODE_VND_INVALID_VRESION_FILE;
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
fp = fopen(file, "r");
char
file
[
PATH_MAX
+
30
]
=
{
0
};
if (!fp) {
sprintf
(
file
,
"%s/vnode%d/term.json"
,
tsVnodeDir
,
vgId
);
if (errno != ENOENT) {
vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_SUCCESS;
}
goto PARSE_VER_ERROR;
}
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
len
=
(
int32_t
)
fread
(
content
,
1
,
maxLen
,
fp
);
if
(
len
<=
0
)
{
if
(
len
<=
0
)
{
vError("vgId:%d, failed to read %s
, content is null", pVnode->
vgId, file);
vError
(
"vgId:%d, failed to read %s
since content is null"
,
vgId
,
file
);
goto PARSE_
VER
_ERROR;
goto
PARSE_
TERM
_ERROR
;
}
}
root
=
cJSON_Parse
(
content
);
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
if
(
root
==
NULL
)
{
vError("vgId:%d, failed to read %s
, invalid json format", pVnode->
vgId, file);
vError
(
"vgId:%d, failed to read %s
since invalid json format"
,
vgId
,
file
);
goto PARSE_
VER
_ERROR;
goto
PARSE_
TERM
_ERROR
;
}
}
cJSON *
ver = cJSON_GetObjectItem(root, "version
");
cJSON
*
term
=
cJSON_GetObjectItem
(
root
,
"term
"
);
if (!
ver || ver
->type != cJSON_Number) {
if
(
!
term
||
term
->
type
!=
cJSON_Number
)
{
vError("vgId:%d, failed to read %s
, version not found", pVnode->
vgId, file);
vError
(
"vgId:%d, failed to read %s
since term not found"
,
vgId
,
file
);
goto PARSE_
VER
_ERROR;
goto
PARSE_
TERM
_ERROR
;
}
}
#if 0
pState
->
term
=
(
uint64_t
)
term
->
valueint
;
pVnode->version = (uint64_t)ver->valueint;
terrno = TSDB_CODE_SUCCESS;
cJSON
*
voteFor
=
cJSON_GetObjectItem
(
root
,
"voteFor"
);
vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
if
(
!
voteFor
||
voteFor
->
type
!=
cJSON_Number
)
{
#endif
vError
(
"vgId:%d, failed to read %s since voteFor not found"
,
vgId
,
file
);
goto
PARSE_TERM_ERROR
;
}
pState
->
voteFor
=
(
int64_t
)
voteFor
->
valueint
;
PARSE_VER_ERROR:
vInfo
(
"vgId:%d, read %s success, voteFor:%"
PRIu64
", term:%"
PRIu64
,
vgId
,
file
,
pState
->
voteFor
,
pState
->
term
);
PARSE_TERM_ERROR:
if
(
content
!=
NULL
)
free
(
content
);
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
if
(
fp
!=
NULL
)
fclose
(
fp
);
return terrno;
return
ret
;
#endif
return
0
;
}
}
int32_t
vnodeWriteTerm
(
int32_t
vgid
,
SSyncServerState
*
pState
)
{
int32_t
vnodeWriteTerm
(
int32_t
vgId
,
SSyncServerState
*
pState
)
{
#if 0
char
file
[
PATH_MAX
+
30
]
=
{
0
};
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf
(
file
,
"%s/vnode%d/term.json"
,
tsVnodeDir
,
vgId
);
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE
*
fp
=
fopen
(
file
,
"w"
);
FILE
*
fp
=
fopen
(
file
,
"w"
);
if
(
!
fp
)
{
if
(
!
fp
)
{
vError("vgId:%d, failed to write %s
, reason:%s", pVnode->
vgId, file, strerror(errno));
vError
(
"vgId:%d, failed to write %s
since %s"
,
vgId
,
file
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
maxLen
=
100
;
int32_t
maxLen
=
100
;
char
*
content = calloc(1, maxLen + 1);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
#if 0
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
term
\"
: %"
PRIu64
"
\n
"
,
pState
->
term
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
voteFor
\"
: %"
PRIu64
"
\n
"
,
pState
->
voteFor
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
#endif
fwrite
(
content
,
1
,
len
,
fp
);
fwrite
(
content
,
1
,
len
,
fp
);
taosFsyncFile
(
fileno
(
fp
));
taosFsyncFile
(
fileno
(
fp
));
fclose
(
fp
);
fclose
(
fp
);
free
(
content
);
free
(
content
);
terrno = 0;
// vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
vInfo
(
"vgId:%d, write %s success, voteFor:%"
PRIu64
", term:%"
PRIu64
,
vgId
,
file
,
pState
->
voteFor
,
pState
->
term
);
#endif
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
\ No newline at end of file
source/server/vnode/src/vnodeInt.c
浏览文件 @
930dca2a
...
@@ -24,6 +24,7 @@
...
@@ -24,6 +24,7 @@
static
struct
{
static
struct
{
SSteps
*
steps
;
SSteps
*
steps
;
SVnodeFp
fp
;
SVnodeFp
fp
;
void
(
*
msgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
}
tsVint
;
}
tsVint
;
void
vnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
)
{
void
vnodeGetDnodeEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
)
{
...
@@ -36,7 +37,43 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
...
@@ -36,7 +37,43 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
void
vnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
)
{
return
(
*
tsVint
.
fp
.
SendMsgToMnode
)(
rpcMsg
);
}
void
vnodeSendMsgToMnode
(
struct
SRpcMsg
*
rpcMsg
)
{
return
(
*
tsVint
.
fp
.
SendMsgToMnode
)(
rpcMsg
);
}
void
vnodeReportStartup
(
char
*
name
,
char
*
desc
)
{
(
*
tsVint
.
fp
.
ReportStartup
)(
name
,
desc
);
}
void
vnodeProcessMsg
(
SRpcMsg
*
pMsg
)
{
if
(
tsVint
.
msgFp
[
pMsg
->
msgType
])
{
(
*
tsVint
.
msgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
assert
(
0
);
}
}
static
void
vnodeInitMsgFp
()
{
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_SYNC_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_COMPACT_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
vnodeProcessMgmtMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
vnodeProcessWriteMsg
;
// mq related
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_DISCONNECT
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_ACK
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_RESET
]
=
vnodeProcessWriteMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_QUERY
]
=
vnodeProcessReadMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONSUME
]
=
vnodeProcessReadMsg
;
// mq related end
tsVint
.
msgFp
[
TSDB_MSG_TYPE_QUERY
]
=
vnodeProcessReadMsg
;
tsVint
.
msgFp
[
TSDB_MSG_TYPE_FETCH
]
=
vnodeProcessReadMsg
;
}
int32_t
vnodeInit
(
SVnodePara
para
)
{
int32_t
vnodeInit
(
SVnodePara
para
)
{
vnodeInitMsgFp
();
tsVint
.
fp
=
para
.
fp
;
tsVint
.
fp
=
para
.
fp
;
struct
SSteps
*
steps
=
taosStepInit
(
8
,
NULL
);
struct
SSteps
*
steps
=
taosStepInit
(
8
,
NULL
);
...
...
source/server/vnode/src/vnodeMain.c
浏览文件 @
930dca2a
...
@@ -44,7 +44,6 @@ static struct {
...
@@ -44,7 +44,6 @@ static struct {
SHashObj
*
hash
;
SHashObj
*
hash
;
int32_t
openVnodes
;
int32_t
openVnodes
;
int32_t
totalVnodes
;
int32_t
totalVnodes
;
void
(
*
msgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
}
tsVnode
;
}
tsVnode
;
static
bool
vnodeSetInitStatus
(
SVnode
*
pVnode
)
{
static
bool
vnodeSetInitStatus
(
SVnode
*
pVnode
)
{
...
@@ -566,34 +565,7 @@ void vnodeRelease(SVnode *pVnode) {
...
@@ -566,34 +565,7 @@ void vnodeRelease(SVnode *pVnode) {
}
}
}
}
static
void
vnodeInitMsgFp
()
{
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_SYNC_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_COMPACT_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
vnodeProcessMgmtMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_UPDATE_TAG_VAL
]
=
vnodeProcessWriteMsg
;
// mq related
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONNECT
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_DISCONNECT
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_ACK
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_RESET
]
=
vnodeProcessWriteMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_QUERY
]
=
vnodeProcessReadMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_MQ_CONSUME
]
=
vnodeProcessReadMsg
;
// mq related end
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_QUERY
]
=
vnodeProcessReadMsg
;
tsVnode
.
msgFp
[
TSDB_MSG_TYPE_FETCH
]
=
vnodeProcessReadMsg
;
}
int32_t
vnodeInitMain
()
{
int32_t
vnodeInitMain
()
{
vnodeInitMsgFp
();
tsVnode
.
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
tsVnode
.
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tsVnode
.
hash
==
NULL
)
{
if
(
tsVnode
.
hash
==
NULL
)
{
vError
(
"failed to init vnode mgmt"
);
vError
(
"failed to init vnode mgmt"
);
...
@@ -654,11 +626,3 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
...
@@ -654,11 +626,3 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
}
}
}
}
}
}
void
vnodeProcessMsg
(
SRpcMsg
*
pMsg
)
{
if
(
tsVnode
.
msgFp
[
pMsg
->
msgType
])
{
(
*
tsVnode
.
msgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
assert
(
0
);
}
}
source/server/vnode/src/vnodeWrite.c
浏览文件 @
930dca2a
...
@@ -15,8 +15,6 @@
...
@@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "tqueue.h"
#include "tworker.h"
#include "vnodeMain.h"
#include "vnodeMain.h"
#include "vnodeWrite.h"
#include "vnodeWrite.h"
#include "vnodeWriteMsg.h"
#include "vnodeWriteMsg.h"
...
...
source/util/src/terror.c
浏览文件 @
930dca2a
...
@@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f
...
@@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR
,
"Missing data file"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR
,
"Missing data file"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_OUT_OF_MEMORY
,
"Out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_OUT_OF_MEMORY
,
"Out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_APP_ERROR
,
"Unexpected generic error in vnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_APP_ERROR
,
"Unexpected generic error in vnode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_
VRESION_FILE
,
"Invalid version
file"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_
CFG_FILE
,
"Invalid config
file"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_I
S_FULL
,
"Database memory is full for commit failed
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_I
NVALID_TERM_FILE
,
"Invalid term file
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FLOWCTRL
,
"Database memory is full
for waiting commit
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FLOWCTRL
,
"Database memory is full"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_DROPPING
,
"Database is dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_DROPPING
,
"Database is dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_
BALANCING
,
"Database is balanc
ing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_
UPDATING
,
"Database is updat
ing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_CLOSING
,
"Database is closing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_CLOSING
,
"Database is closing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_SYNCED
,
"Database suspended"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_SYNCED
,
"Database suspended"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
"Database write operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
"Database write operation denied"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录