提交 5aef8c57 编写于 作者: S Shengliang Guan

Re implement the TD-1925 code because it cannot merge

上级 cef85dc7
......@@ -13,7 +13,6 @@ ENDIF ()
SET(TD_ACCOUNT FALSE)
SET(TD_ADMIN FALSE)
SET(TD_GRANT FALSE)
SET(TD_SYNC TRUE)
SET(TD_MQTT TRUE)
SET(TD_TSDB_PLUGINS FALSE)
......
......@@ -13,10 +13,6 @@ IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT)
ENDIF ()
IF (TD_SYNC)
ADD_DEFINITIONS(-D_SYNC)
ENDIF ()
IF (TD_MQTT)
ADD_DEFINITIONS(-D_MQTT)
ENDIF ()
......
......@@ -47,11 +47,6 @@ IF (${MQTT} MATCHES "false")
MESSAGE(STATUS "build without mqtt module")
ENDIF ()
IF (${SYNC} MATCHES "false")
SET(TD_SYNC FALSE)
MESSAGE(STATUS "build without sync module")
ENDIF ()
IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
......
......@@ -10,9 +10,7 @@ ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(query)
ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(plugins)
IF (TD_SYNC)
ADD_SUBDIRECTORY(sync)
ENDIF ()
ADD_SUBDIRECTORY(sync)
ADD_SUBDIRECTORY(balance)
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(vnode)
......
......@@ -35,6 +35,7 @@ extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak;
extern int32_t tsEnableTelemetryReporting;
extern char tsEmail[];
extern char tsArbitrator[];
// common
extern int tsRpcTimer;
......
......@@ -12,7 +12,7 @@ AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4)
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd taos_static)
......@@ -32,10 +32,6 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES(taosd mqtt)
ENDIF ()
IF (TD_SYNC)
TARGET_LINK_LIBRARIES(taosd balance sync)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitCfg();
void dnodeCleanupCfg();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitCheck();
void dnodeCleanupCheck();
......
......@@ -19,8 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
#include "dnodeInt.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
......
......@@ -19,8 +19,13 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"
extern int32_t dDebugFlag;
......
......@@ -19,8 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
#include "dnodeInt.h"
int32_t dnodeInitMInfos();
void dnodeCleanupMInfos();
......@@ -29,6 +28,10 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *pEpSet);
void dnodeGetMInfos(SMInfos *pMinfos);
bool dnodeIsMasterEp(char *ep);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus
}
#endif
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMPeer();
void dnodeCleanupMPeer();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMRead();
void dnodeCleanupMRead();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMWrite();
void dnodeCleanupMWrite();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitModules();
void dnodeStartModules();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitServer();
void dnodeCleanupServer();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitShell();
void dnodeCleanupShell();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_STEP_H
#define TDENGINE_DNODE_STEP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize);
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize);
void dnodeReportStep(char *name, char *desc, int8_t finished);
void dnodeSendStartupStep(SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitTelemetry();
void dnodeCleanupTelemetry();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_VMGMT_H
#define TDENGINE_DNODE_VMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitVMgmt();
void dnodeCleanupVMgmt();
void dnodeDispatchToVMgmtQueue(SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitVRead();
void dnodeCleanupVRead();
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_VNODES_H
#define TDENGINE_DNODE_VNODES_H
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
int32_t dnodeInitTimer();
void dnodeCleanupTimer();
void dnodeSendStatusMsgToMnode();
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0};
......
......@@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeCheck.h"
typedef struct {
......
......@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "hash.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeEps.h"
static SDnodeEps *tsEps = NULL;
......
......@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMInfos.h"
static SMInfos tsMInfos;
......@@ -297,3 +294,25 @@ static int32_t dnodeWriteMInfos() {
dInfo("successed to write %s", file);
return 0;
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
\ No newline at end of file
......@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVMgmt.h"
#include "dnodeMInfos.h"
#include "dnodeMWrite.h"
typedef struct {
......
......@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVMgmt.h"
#include "dnodeMInfos.h"
#include "dnodeMRead.h"
typedef struct {
......
......@@ -15,17 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "ttimer.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVMgmt.h"
#include "dnodeMInfos.h"
#include "dnodeMWrite.h"
typedef struct {
......
......@@ -16,15 +16,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taos.h"
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tfile.h"
#include "twal.h"
#include "trpc.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
// #include "tfs.h"
#include "tsync.h"
#include "dnodeStep.h"
#include "dnodePeer.h"
#include "dnodeModule.h"
#include "dnodeEps.h"
......@@ -33,6 +30,8 @@
#include "dnodeCheck.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeVMgmt.h"
#include "dnodeVnodes.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeMPeer.h"
......@@ -45,38 +44,32 @@ static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
typedef struct {
const char *const name;
int32_t (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"tfile", tfInit, tfCleanup},
{"rpc", rpcInit, rpcCleanup},
{"globalcfg" ,taosCheckGlobalCfg, NULL},
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMRead, dnodeCleanupMRead},
{"mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
{"mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
{"client", dnodeInitClient, dnodeCleanupClient},
{"server", dnodeInitServer, dnodeCleanupServer},
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
{"modules", dnodeInitModules, dnodeCleanupModules},
{"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer},
{"shell", dnodeInitShell, dnodeCleanupShell},
{"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
static SStep tsDnodeSteps[] = {
{"dnode-tfile", tfInit, tfCleanup},
{"dnode-rpc", rpcInit, rpcCleanup},
{"dnode-globalcfg", taosCheckGlobalCfg, NULL},
{"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnode-cfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnode-eps", dnodeInitEps, dnodeCleanupEps},
{"dnode-minfos", dnodeInitMInfos, dnodeCleanupMInfos},
{"dnode-wal", walInit, walCleanUp},
{"dnode-sync", syncInit, syncCleanUp},
{"dnode-check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"dnode-vread", dnodeInitVRead, dnodeCleanupVRead},
{"dnode-vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"dnode-vmgmt", dnodeInitVMgmt, dnodeCleanupVMgmt},
{"dnode-mread", dnodeInitMRead, dnodeCleanupMRead},
{"dnode-mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
{"dnode-mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
{"dnode-client", dnodeInitClient, dnodeCleanupClient},
{"dnode-server", dnodeInitServer, dnodeCleanupServer},
{"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
{"dnode-tmr", dnodeInitTimer, dnodeCleanupTimer},
{"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
static int dnodeCreateDir(const char *dir) {
......@@ -87,24 +80,14 @@ static int dnodeCreateDir(const char *dir) {
return 0;
}
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
if (tsDnodeComponents[i].cleanup) {
(*tsDnodeComponents[i].cleanup)();
}
}
static void dnodeCleanupComponents() {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize);
}
static int32_t dnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) {
if (tsDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize);
}
int32_t dnodeInitSystem() {
......@@ -151,7 +134,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_RUN_STATUS_STOPPED);
dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1);
dnodeCleanupComponents();
taos_cleanup();
taosCloseLog();
}
......
......@@ -15,15 +15,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "mnode.h"
#include "http.h"
#include "tmqtt.h"
#include "monitor.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeModule.h"
typedef struct {
......
......@@ -21,15 +21,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
#include "dnodeStep.h"
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
......@@ -44,19 +41,19 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort;
......@@ -91,8 +88,9 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
.pCont = NULL,
.contLen = 0
};
if (pMsg->pCont == NULL) return;
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) return dnodeSendStartupStep(pMsg);
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_APP_NOT_READY;
......
......@@ -15,20 +15,14 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tutil.h"
#include "http.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeShell.h"
#include "dnodeStep.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
......@@ -74,6 +68,8 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) {
......@@ -142,7 +138,23 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, "nettestinternal") == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
dTrace("nettest user is authorized");
return 0;
}
return -1;
}
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0;
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
......@@ -220,4 +232,4 @@ SStatisInfo dnodeGetStatisInfo() {
}
return info;
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "dnodeStep.h"
static SStartupStep tsStartupStep;
void dnodeReportStep(char *name, char *desc, int8_t finished) {
tstrncpy(tsStartupStep.name, name, sizeof(tsStartupStep.name));
tstrncpy(tsStartupStep.desc, desc, sizeof(tsStartupStep.desc));
tsStartupStep.finished = finished;
}
void dnodeSendStartupStep(SRpcMsg *pMsg) {
dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
memcpy(pStep, &tsStartupStep, sizeof(SStartupStep));
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
void taosStepCleanupImp(SStep *pSteps, int32_t stepId) {
for (int32_t step = stepId; step >= 0; step--) {
SStep *pStep = pSteps + step;
dDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)();
}
}
}
int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) {
for (int32_t step = 0; step < stepSize; step++) {
SStep *pStep = pSteps + step;
if (pStep->initFp == NULL) continue;
dnodeReportStep(pStep->name, "Start initialization", 0);
int32_t code = (*pStep->initFp)();
if (code != 0) {
dDebug("step:%s will init", pStep->name);
taosStepCleanupImp(pSteps, step);
return code;
}
dnodeReportStep(pStep->name, "Initialization complete", step + 1 >= stepSize);
}
return 0;
}
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) {
return taosStepCleanupImp(pSteps, stepSize - 1);
}
\ No newline at end of file
......@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
......
......@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tutil.h"
#include "osTime.h"
#include "tsocket.h"
#include "tbuffer.h"
......@@ -32,8 +29,6 @@
#include "mnodeTable.h"
#include "mnodeSdb.h"
#include "mnodeAcct.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeTelemetry.h"
static tsem_t tsExitSem;
......@@ -313,4 +308,4 @@ void dnodeCleanupTelemetry() {
pthread_join(tsTelemetryThread, NULL);
tsem_destroy(&tsExitSem);
}
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tqueue.h"
#include "dnodeVMgmt.h"
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SMgmtMsg;
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) return -1;
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
dError("failed to create the vmgmt queue set");
return -1;
}
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the vmgmt queue");
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno));
return -1;
}
dInfo("dnode vmgmt is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupVMgmt() {
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
vnodeCleanupMgmt();
}
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
int32_t size = sizeof(SMgmtMsg) + pMsg->contLen;
SMgmtMsg *pMgmt = taosAllocateQitem(size);
if (pMgmt == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS;
}
void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
int32_t code = dnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
static void *dnodeProcessMgmtQueue(void *param) {
SMgmtMsg *pMgmt;
SRpcMsg * pMsg;
SRpcMsg rsp = {0};
int32_t qtype;
void * handle;
while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break;
}
pMsg = &pMgmt->rpcMsg;
dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
}
dDebug("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
taosFreeQitem(pMsg);
}
return NULL;
}
static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
}
return pCreate;
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
} else {
dDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId);
return vnodeCreate(pCreate);
}
}
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter);
vnodeRelease(pVnode);
return code;
} else {
dError("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return vnodeDrop(pDrop->vgId);
}
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
return 0;
}
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -15,12 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h"
#include "vnode.h"
#include "dnodeInt.h"
#include "dnodeVRead.h"
typedef struct {
pthread_t thread; // thread
......
......@@ -15,13 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h"
#include "twal.h"
#include "vnode.h"
#include "dnodeInt.h"
#include "dnodeVWrite.h"
typedef struct {
taos_qall qall;
......
......@@ -15,28 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "ttimer.h"
#include "tsdb.h"
#include "twal.h"
#include "tqueue.h"
#include "tsync.h"
#include "ttimer.h"
#include "tbn.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeModule.h"
#include "dnodeVnodes.h"
typedef struct {
pthread_t thread;
......@@ -47,110 +30,30 @@ typedef struct {
int32_t * vnodeList;
} SOpenVnodeThread;
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SMgmtMsg;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void *dnodeProcessMgmtQueue(void *param);
static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes();
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec();
int32_t code = vnodeInitResources();
if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
return -1;
}
// create the queue and thread to handle the message
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
dError("failed to create the mgmt queue set");
dnodeCleanupMgmt();
return -1;
}
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the mgmt queue");
dnodeCleanupMgmt();
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime = 0;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
dnodeCleanupMgmt();
return -1;
}
code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
return -1;
}
dInfo("dnode mgmt is initialized");
return TSDB_CODE_SUCCESS;
}
int32_t dnodeInitMgmtTimer() {
int32_t dnodeInitTimer() {
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
dnodeCleanupMgmt();
return -1;
}
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec();
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dInfo("dnode mgmt timer is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeSendStatusMsgToMnode() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
dInfo("dnode timer is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupMgmtTimer() {
void dnodeCleanupTimer() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
......@@ -162,82 +65,9 @@ void dnodeCleanupMgmtTimer() {
}
}
void dnodeCleanupMgmt() {
dnodeCleanupMgmtTimer();
dnodeCloseVnodes();
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
vnodeCleanupResources();
}
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
int32_t size = sizeof(SMgmtMsg) + pMsg->contLen;
SMgmtMsg *pMgmt = taosAllocateQitem(size);
if (pMgmt == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS;
}
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
int32_t code = dnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
static void *dnodeProcessMgmtQueue(void *param) {
SMgmtMsg *pMgmt;
SRpcMsg * pMsg;
SRpcMsg rsp = {0};
int32_t qtype;
void * handle;
while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break;
}
pMsg = &pMgmt->rpcMsg;
dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
}
rsp.handle = pMsg->handle;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
taosFreeQitem(pMsg);
}
return NULL;
}
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_DND_NO_WRITE_ACCESS;
}
if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;
*numOfVnodes = 0;
struct dirent *de = NULL;
......@@ -265,14 +95,12 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char vnodeDir[TSDB_FILENAME_LEN * 3];
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
if (vnodeOpen(vgId, vnodeDir) < 0) {
if (vnodeOpen(vgId) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
......@@ -286,7 +114,7 @@ static void *dnodeOpenVnode(void *param) {
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t dnodeInitVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
......@@ -349,7 +177,7 @@ static int32_t dnodeOpenVnodes() {
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
void dnodeCleanupVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0;
int32_t status;
......@@ -368,107 +196,6 @@ static void dnodeCloseVnodes() {
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
}
return pCreate;
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
} else {
dDebug("vgId:%d, create vnode msg is received", pCreate->cfg.vgId);
return vnodeCreate(pCreate);
}
}
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter);
vnodeRelease(pVnode);
return code;
} else {
dError("vgId:%d, vnode not exist, can't alter it", pAlter->cfg.vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return vnodeDrop(pDrop->vgId);
}
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime);
// pStream->vnode = htonl(pStream->vnode);
// pStream->sid = htonl(pStream->sid);
// pStream->status = htonl(pStream->status);
//
// int32_t code = dnodeCreateStream(pStream);
return 0;
}
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
......@@ -477,8 +204,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
SStatusRsp *pStatusRsp = pMsg->pCont;
SMInfos *pMinfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(pMinfos);
SMInfos *minfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos);
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
......@@ -538,11 +265,11 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
......@@ -554,24 +281,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
dnodeSendMsgToDnode(&epSet, &rpcMsg);
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
for (int i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
epSet.port[i] = htons(epSet.port[i]);
void dnodeSendStatusMsgToMnode() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
}
\ No newline at end of file
......@@ -71,8 +71,18 @@ void dnodeDelayReprocessMWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode();
typedef struct {
char *name;
int32_t (*initFp)();
void (*cleanupFp)();
} SStep;
int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize);
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize);
void dnodeReportStep(char *name, char *desc, int8_t finished);
#ifdef __cplusplus
}
#endif
#endif
#endif
\ No newline at end of file
......@@ -286,6 +286,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24
......@@ -439,6 +442,10 @@ typedef enum {
TAOS_QTYPE_QUERY = 4
} EQType;
#define TSDB_MAX_TIERS 3
#define TSDB_MAX_DISKS_PER_TIER 16
#define TSDB_MAX_DISKS (TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER)
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
TSDB_CHILD_TABLE = 1, // table created from super table
......
......@@ -192,6 +192,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "Message no
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, 0, 0x0404, "Action in progress")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress")
......@@ -394,6 +395,18 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_BAD_SEQ, 0, 0x2113, "src bad se
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_INCOMPLETE, 0, 0x2114, "src incomplete")
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_GENERAL, 0, 0x2115, "src general")
// tfs
TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, 0, 0x2200, "tfs out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, 0, 0x2201, "tfs invalid mount config")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, 0, 0x2202, "tfs too many mount")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, 0, 0x2203, "tfs duplicate primary mount")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_PRIMARY_DISK, 0, 0x2204, "tfs no primary mount")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_MOUNT_AT_TIER, 0, 0x2205, "tfs no mount at tier")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, 0, 0x2206, "tfs file already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, 0, 0x2207, "tfs invalid level")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, 0, 0x2208, "tfs no valid disk")
#ifdef TAOS_ERROR_C
};
#endif
......
......@@ -105,10 +105,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "network-test" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105
......@@ -840,6 +837,14 @@ typedef struct {
char ckey[TSDB_KEY_LEN];
} SAuthMsg, SAuthRsp;
typedef struct {
int8_t finished;
int8_t reserved1[7];
char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN];
char reserved2[64];
} SStartupStep;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -321,7 +321,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue(int nthreads);
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo);
void tsdbIncCommitRef(int vgId);
......
......@@ -19,18 +19,9 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "trpc.h"
#include "twal.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
typedef struct {
int32_t len;
void * rsp;
......@@ -60,29 +51,35 @@ typedef struct {
SWalHead pHead[];
} SVWriteMsg;
// vnodeStatus
extern char *vnodeStatus[];
// vnodeMain
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount
// vnodeMgmt
int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void* vnodeAcquire(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetWal(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
// vnodeWrite
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
// vnodeSync
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
// vnodeRead
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
......@@ -91,4 +88,4 @@ int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
}
#endif
#endif
#endif
\ No newline at end of file
......@@ -51,7 +51,6 @@ typedef struct SShellArguments {
char* commands;
int abort;
int port;
int endPort;
int pktLen;
char* netTestRole;
} SShellArguments;
......
......@@ -32,14 +32,14 @@
/**************** Global variables ****************/
#ifdef _TD_POWER_
char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n"
"Copyright (c) 2017 by PowerDB, Inc. All rights reserved.\n\n";
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
char PROMPT_HEADER[] = "power> ";
char CONTINUE_PROMPT[] = " -> ";
int prompt_size = 7;
#else
char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n";
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char PROMPT_HEADER[] = "taos> ";
char CONTINUE_PROMPT[] = " -> ";
......
......@@ -46,8 +46,7 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|clients|server."},
{"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|server|rpc|startup."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}};
......@@ -130,20 +129,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'd':
arguments->database = arg;
break;
case 'n':
arguments->netTestRole = arg;
break;
case 'e':
if (arg) {
arguments->endPort = atoi(arg);
} else {
fprintf(stderr, "Invalid end port\n");
return -1;
}
break;
case 'l':
if (arg) {
arguments->pktLen = atoi(arg);
......@@ -152,7 +140,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return -1;
}
break;
case OPT_ABORT:
arguments->abort = 1;
break;
......
......@@ -61,8 +61,7 @@ SShellArguments args = {
.file = "\0",
.dir = "\0",
.threadNum = 5,
.commands = NULL,
.endPort = 6042,
.commands = NULL,
.pktLen = 1000,
.netTestRole = NULL
};
......@@ -81,9 +80,7 @@ int main(int argc, char* argv[]) {
if (args.netTestRole && args.netTestRole[0] != 0) {
taos_init();
CmdArguments cmdArgs;
memcpy(&cmdArgs, &args, sizeof(SShellArguments));
taosNetTest(&cmdArgs);
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen);
exit(0);
}
......
......@@ -317,13 +317,6 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
#ifndef _SYNC
if (pCfg->replications != 1) {
mError("invalid db option replications:%d can only be 1 in this version", pCfg->replications);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
#endif
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) {
mError("invalid db option update:%d valid range: [%d, %d]", pCfg->update, TSDB_MIN_DB_UPDATE, TSDB_MAX_DB_UPDATE);
return TSDB_CODE_MND_INVALID_DB_OPTION;
......
......@@ -111,9 +111,6 @@ static int32_t mnodeDnodeActionInsert(SSdbRow *pRow) {
static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) {
SDnodeObj *pDnode = pRow->pObj;
#ifndef _SYNC
mnodeDropAllDnodeVgroups(pDnode);
#endif
mnodeDropMnodeLocal(pDnode->dnodeId);
bnNotify();
mnodeUpdateDnodeEps();
......@@ -705,11 +702,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
mInfo("dnode:%d, start to drop it", pDnode->dnodeId);
#ifndef _SYNC
int32_t code = mnodeDropDnode(pDnode, pMsg);
#else
int32_t code = bnDropDnode(pDnode);
#endif
mnodeDecDnodeRef(pDnode);
return code;
}
......@@ -1179,58 +1172,3 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
default:return "any";
}
}
#ifndef _SYNC
int32_t bnInit() { return TSDB_CODE_SUCCESS; }
void bnCleanUp() {}
void bnNotify() {}
void bnCheckModules() {}
void bnReset() {}
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; }
char* syncRole[] = {
"offline",
"unsynced",
"syncing",
"slave",
"master"
};
int32_t bnAllocVnodes(SVgObj *pVgroup) {
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL;
float vnodeUsage = 1000.0;
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->numOfCores > 0 && pDnode->openVnodes < TSDB_MAX_VNODES) {
float openVnodes = pDnode->openVnodes;
if (pDnode->isMgmt) openVnodes += tsMnodeEqualVnodeNum;
float usage = openVnodes / pDnode->numOfCores;
if (usage <= vnodeUsage) {
pSelDnode = pDnode;
vnodeUsage = usage;
}
}
mnodeDecDnodeRef(pDnode);
}
if (pSelDnode == NULL) {
mError("failed to alloc vnode to vgroup");
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
pVgroup->vnodeGid[0].pDnode = pSelDnode;
mDebug("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
return TSDB_CODE_SUCCESS;
}
#endif
......@@ -37,16 +37,10 @@
#include "mnodeShow.h"
#include "mnodeProfile.h"
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SMnodeComponent;
void *tsMnodeTmr = NULL;
static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = {
static SStep tsMnodeSteps[] = {
{"sdbref", sdbInitRef, sdbCleanUpRef},
{"profile", mnodeInitProfile, mnodeCleanupProfile},
{"cluster", mnodeInitCluster, mnodeCleanupCluster},
......@@ -67,22 +61,14 @@ static void mnodeInitTimer();
static void mnodeCleanupTimer();
static bool mnodeNeedStart() ;
static void mnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsMnodeComponents[i].cleanup();
}
static void mnodeCleanupComponents() {
int32_t stepSize = sizeof(tsMnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsMnodeSteps, stepSize);
}
static int32_t mnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
int32_t stepSize = sizeof(tsMnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsMnodeSteps, stepSize);
}
int32_t mnodeStartSystem() {
......@@ -132,7 +118,7 @@ void mnodeCleanupSystem() {
dnodeFreeMReadQueue();
dnodeFreeMPeerQueue();
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mnodeCleanupComponents();
mInfo("mnode is cleaned up");
}
......
......@@ -1086,13 +1086,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
}
} else { // msg is passed to app only parsing is ok
if (pHead->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
rpcFreeMsg(pRecv->msg);
return pConn;
}
rpcProcessIncomingMsg(pConn, pHead, pContext);
}
}
......
......@@ -14,6 +14,7 @@
*/
#include "os.h"
#include "tglobal.h"
#include "tlist.h"
#include "tref.h"
#include "tsdbMain.h"
......@@ -36,7 +37,8 @@ static void *tsdbLoopCommit(void *arg);
SCommitQueue tsCommitQueue = {0};
int tsdbInitCommitQueue(int nthreads) {
int tsdbInitCommitQueue() {
int nthreads = tsNumOfCommitThreads;
SCommitQueue *pQueue = &tsCommitQueue;
if (nthreads < 1) nthreads = 1;
......
......@@ -20,27 +20,7 @@
extern "C" {
#endif
typedef struct CmdArguments {
char* host;
char* password;
char* user;
char* auth;
char* database;
char* timezone;
bool is_raw_time;
bool is_use_passwd;
char file[TSDB_FILENAME_LEN];
char dir[TSDB_FILENAME_LEN];
int threadNum;
char* commands;
int abort;
int port;
int endPort;
int pktLen;
char* netTestRole;
} CmdArguments;
void taosNetTest(CmdArguments* args);
void taosNetTest(char *role, char *host, int port, int pkgLen);
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -358,8 +358,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
if (queue->head) {
pNode = queue->head;
*pitem = pNode->item;
*type = pNode->type;
*phandle = queue->ahandle;
if (type) *type = pNode->type;
if (phandle) *phandle = queue->ahandle;
queue->head = pNode->next;
if (queue->head == NULL)
queue->tail = NULL;
......
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadCfg(SVnodeObj *pVnode);
int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg);
......
......@@ -19,11 +19,11 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
#include "tsync.h"
#include "twal.h"
#include "tcq.h"
#include "tsdb.h"
#include "vnode.h"
extern int32_t vDebugFlag;
......@@ -35,39 +35,37 @@ extern int32_t vDebugFlag;
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t flowctrlLevel;
int8_t status;
int8_t role;
int8_t accessState;
int8_t isFull;
int8_t isCommiting;
uint64_t version; // current version
uint64_t fversion; // version on saved data file
void *wqueue;
void *rqueue;
void *wal;
void *tsdb;
int64_t sync;
void *events;
void *cq; // continuous query
int32_t cfgVersion;
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
void *qMgmt;
char *rootDir;
tsem_t sem;
int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t flowctrlLevel;
int8_t status;
int8_t role;
int8_t accessState;
int8_t isFull;
int8_t isCommiting;
uint64_t version; // current version
uint64_t fversion; // version on saved data file
void * wqueue;
void * rqueue;
void * wal;
void * tsdb;
int64_t sync;
void * events;
void * cq; // continuous query
int32_t cfgVersion;
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
void * qMgmt;
char * rootDir;
tsem_t sem;
int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex;
} SVnodeObj;
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_MAIN_H
#define TDENGINE_VNODE_MAIN_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeReset(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_MGMT_H
#define TDENGINE_VNODE_MGMT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void* vnodeAcquire(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetWal(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
void vnodeAddIntoHash(SVnodeObj* pVnode);
void vnodeRemoveFromHash(SVnodeObj * pVnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_READ_H
#define TDENGINE_VNODE_READ_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitRead(void);
void vnodeCleanupRead(void);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_STATUS_H
#define TDENGINE_VNODE_STATUS_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
bool vnodeSetInitStatus(SVnodeObj* pVnode);
bool vnodeSetReadyStatus(SVnodeObj* pVnode);
bool vnodeSetClosingStatus(SVnodeObj* pVnode);
bool vnodeSetUpdatingStatus(SVnodeObj* pVnode);
bool vnodeSetResetStatus(SVnodeObj* pVnode);
bool vnodeInInitStatus(SVnodeObj* pVnode);
bool vnodeInReadyStatus(SVnodeObj* pVnode);
bool vnodeInClosingStatus(SVnodeObj* pVnode);
bool vnodeInResetStatus(SVnodeObj* pVnode);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_SYNC_H
#define TDENGINE_VNODE_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver);
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
void vnodeNotifyRole(int32_t vgId, int8_t role);
void vnodeCtrlFlow(int32_t vgId, int32_t level);
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeReadVersion(SVnodeObj *pVnode);
int32_t vnodeSaveVersion(SVnodeObj *pVnode);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_WORKER_H
#define TDENGINE_VNODE_WORKER_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitMWorker();
void vnodeCleanupMWorker();
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle);
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_WRITE_H
#define TDENGINE_VNODE_WRITE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitWrite(void);
void vnodeCleanupWrite(void);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -15,13 +15,9 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "cJSON.h"
#include "tglobal.h"
#include "tsdb.h"
#include "dnode.h"
#include "vnodeInt.h"
#include "vnodeCfg.h"
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
......
......@@ -18,77 +18,17 @@
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "trpc.h"
#include "tsdb.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeInt.h"
// #include "tfs.h"
#include "query.h"
#include "dnode.h"
#include "vnodeCfg.h"
#include "vnodeStatus.h"
#include "vnodeSync.h"
#include "vnodeVersion.h"
#include "vnodeMgmt.h"
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; }
void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; }
int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
int32_t vnodeInitResources() {
int32_t code = syncInit();
if (code != 0) return code;
vnodeInitWriteFp();
vnodeInitReadFp();
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodesHash == NULL) {
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
vError("failed to init vnode commit queue");
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
tsdbDestroyCommitQueue();
if (tsVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsVnodesHash);
tsVnodesHash = NULL;
}
syncCleanUp();
}
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
int32_t code;
......@@ -155,7 +95,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel,
pVnodeCfg->cfg.fsyncPeriod);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
code = vnodeOpen(pVnodeCfg->cfg.vgId);
return code;
}
......@@ -176,73 +116,81 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
int32_t code = vnodeWriteCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = walAlter(pVnode->wal, &pVnode->walCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
}
if (pVnode->tsdb) {
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
return code;
}
}
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
return 0;
}
return TSDB_CODE_SUCCESS;
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
if (!vnodeSetUpdatingStatus(pVnode)) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
int32_t code = vnodeAlterImp(pVnode, pVnodeCfg);
vnodeSetReadyStatus(pVnode);
if (code != 0) {
vError("vgId:%d, failed to alter vnode, code:0x%x", pVnode->vgId, code);
} else {
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
}
return code;
}
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
int32_t vnodeOpen(int32_t vgId) {
char temp[TSDB_FILENAME_LEN * 3];
char rootDir[TSDB_FILENAME_LEN * 2];
snprintf(rootDir, TSDB_FILENAME_LEN * 2, "%s/vnode%d", tsVnodeDir, vgId);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
vError("vgId:%d, failed to open vnode since no enough memory", vnode);
vError("vgId:%d, failed to open vnode since no enough memory", vgId);
return TAOS_SYSTEM_ERROR(errno);
}
atomic_add_fetch_32(&pVnode->refCount, 1);
pVnode->vgId = vnode;
pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->vgId = vgId;
pVnode->fversion = 0;
pVnode->version = 0;
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir);
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
pthread_mutex_init(&pVnode->statusMutex, NULL);
vnodeSetInitStatus(pVnode);
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -272,7 +220,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
sprintf(cqCfg.user, "_root");
strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode;
cqCfg.vgId = vgId;
cqCfg.cqWrite = vnodeWriteToCache;
pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) {
......@@ -341,13 +289,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
vnodeAddIntoHash(pVnode);
SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId;
syncInfo.version = pVnode->version;
syncInfo.syncCfg = pVnode->syncCfg;
sprintf(syncInfo.path, "%s", rootDir);
tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN);
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToCache;
......@@ -358,18 +306,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.getVersion = vnodeGetVersion;
pVnode->sync = syncStart(&syncInfo);
#ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER;
#else
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode);
return terrno;
}
#endif
pVnode->status = TAOS_VN_STATUS_READY;
vnodeSetReadyStatus(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -384,24 +328,9 @@ int32_t vnodeClose(int32_t vgId) {
return 0;
}
void vnodeRelease(void *vparam) {
if (vparam == NULL) return;
SVnodeObj *pVnode = vparam;
int32_t code = 0;
int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount > 0) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) {
tsem_post(&pVnode->sem);
}
return;
}
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
void vnodeDestroy(SVnodeObj *pVnode) {
int32_t code = 0;
int32_t vgId = pVnode->vgId;
if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt);
......@@ -464,121 +393,20 @@ void vnodeRelease(void *vparam) {
}
tsem_destroy(&pVnode->sem);
pthread_mutex_destroy(&pVnode->statusMutex);
free(pVnode);
tsdbDecCommitRef(vgId);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnodeObj **ppVnode = (SVnodeObj **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
return *ppVnode;
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0;
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (pVnode->status != TAOS_VN_STATUS_READY) return;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
if (pVnode->tsdb) {
tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
}
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId;
}
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
void vnodeBuildStatusMsg(void *param) {
SStatusMsg *pStatus = param;
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
vnodeBuildVloadMsg(*pVnode, pStatus);
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
}
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
}
}
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
vnodeRemoveFromHash(pVnode);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
// it may be in updateing or reset state, then it shall wait
int32_t i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
while (!vnodeSetClosingStatus(pVnode)) {
if (++i % 1000 == 0) {
sched_yield();
}
......@@ -614,7 +442,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
pVnode->isCommiting = 1;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
return walRenew(pVnode->wal);
}
return 0;
......@@ -624,7 +452,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0;
pVnode->isFull = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal);
}
return vnodeSaveVersion(pVnode);
......@@ -633,75 +461,12 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return 0;
}
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
uint64_t *fversion) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get file info", vgId);
return 0;
}
*fversion = pVnode->fversion;
uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
vnodeRelease(pVnode);
return ret;
}
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get wal info", vgId);
return -1;
}
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
vnodeRelease(pVnode);
return code;
}
static void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vTrace("vgId:%d, vnode not found while notify role", vgId);
return;
}
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role;
dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
cqStart(pVnode->cq);
} else {
cqStop(pVnode->cq);
}
vnodeRelease(pVnode);
}
static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
return;
}
if (pVnode->flowctrlLevel != level) {
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
pVnode->flowctrlLevel = level;
}
vnodeRelease(pVnode);
}
static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
int32_t vnodeReset(SVnodeObj *pVnode) {
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
pVnode->status = TAOS_VN_STATUS_RESET;
if (!vnodeSetResetStatus(pVnode)) {
return -1;
}
void *tsdb = pVnode->tsdb;
......@@ -724,70 +489,8 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
pVnode->status = TAOS_VN_STATUS_READY;
vnodeSetReadyStatus(pVnode);
vnodeRelease(pVnode);
return 0;
}
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify file synced", vgId);
return 0;
}
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
int32_t code = vnodeResetTsdb(pVnode);
vnodeRelease(pVnode);
return code;
}
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId);
return;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode);
}
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
vnodeRelease(pVnode);
return code;
}
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return -1;
}
int32_t code = 0;
if (pVnode->isCommiting) {
vDebug("vgId:%d, vnode is commiting while get version", vgId);
code = -1;
} else {
*fver = pVnode->fversion;
*wver = pVnode->version;
}
vnodeRelease(pVnode);
return code;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "dnode.h"
#include "vnodeStatus.h"
#include "vnodeWorker.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"
#include "vnodeMain.h"
static SHashObj *tsVnodesHash = NULL;
static int32_t vnodeInitHash(void);
static void vnodeCleanupHash(void);
static void vnodeIncRef(void *ptNode);
static SStep tsVnodeSteps[] = {
{"vnode-worker", vnodeInitMWorker, vnodeCleanupMWorker},
{"vnode-write", vnodeInitWrite, vnodeCleanupWrite},
{"vnode-read", vnodeInitRead, vnodeCleanupRead},
{"vnode-hash", vnodeInitHash, vnodeCleanupHash},
{"tsdb-queue", tsdbInitCommitQueue, tsdbDestroyCommitQueue}
};
int32_t vnodeInitMgmt() {
int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsVnodeSteps, stepSize);
}
void vnodeCleanupMgmt() {
int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsVnodeSteps, stepSize);
}
static int32_t vnodeInitHash() {
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodesHash == NULL) {
vError("failed to init vnode mgmt");
return -1;
}
return 0;
}
static void vnodeCleanupHash() {
if (tsVnodesHash != NULL) {
vDebug("vnode mgmt is cleanup");
taosHashCleanup(tsVnodesHash);
tsVnodesHash = NULL;
}
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
void vnodeAddIntoHash(SVnodeObj *pVnode) {
taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
}
void vnodeRemoveFromHash(SVnodeObj *pVnode) {
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnodeObj **ppVnode = (SVnodeObj **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
return *ppVnode;
}
void vnodeRelease(void *vparam) {
SVnodeObj *pVnode = vparam;
if (vparam == NULL) return;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount > 0) {
if (vnodeInResetStatus(pVnode) && refCount <= 3) {
tsem_post(&pVnode->sem);
}
} else {
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
vnodeDestroy(pVnode);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count);
}
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0;
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (!vnodeInReadyStatus(pVnode)) return;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
if (pVnode->tsdb) {
tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
}
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId;
}
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
void vnodeBuildStatusMsg(void *param) {
SStatusMsg *pStatus = param;
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
vnodeBuildVloadMsg(*pVnode, pStatus);
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
}
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
}
}
\ No newline at end of file
......@@ -16,26 +16,26 @@
#define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "query.h"
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "tqueue.h"
#include "tglobal.h"
#include "query.h"
#include "vnodeStatus.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) {
int32_t vnodeInitRead(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
return 0;
}
void vnodeCleanupRead() {}
//
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
......@@ -54,7 +54,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
}
static int32_t vnodeCheckRead(SVnodeObj *pVnode) {
if (pVnode->status != TAOS_VN_STATUS_READY) {
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "vnodeStatus.h"
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
bool vnodeSetInitStatus(SVnodeObj* pVnode) {
pthread_mutex_lock(&pVnode->statusMutex);
pVnode->status = TAOS_VN_STATUS_INIT;
pthread_mutex_unlock(&pVnode->statusMutex);
return true;
}
bool vnodeSetReadyStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY ||
pVnode->status == TAOS_VN_STATUS_UPDATING || pVnode->status == TAOS_VN_STATUS_RESET) {
pVnode->status = TAOS_VN_STATUS_READY;
set = true;
} else {
vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_CLOSING;
set = true;
} else {
vTrace("vgId:%d, cannot set status:closing, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_UPDATING;
set = true;
} else {
vDebug("vgId:%d, cannot set status:updating, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetResetStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
pVnode->status = TAOS_VN_STATUS_RESET;
set = true;
} else {
vDebug("vgId:%d, cannot set status:reset, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeInInitStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInReadyStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInClosingStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInResetStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_RESET) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "dnode.h"
#include "vnodeVersion.h"
#include "vnodeMain.h"
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get file info", vgId);
return 0;
}
*fver = pVnode->fversion;
uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
vnodeRelease(pVnode);
return ret;
}
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while get wal info", vgId);
return -1;
}
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
vnodeRelease(pVnode);
return code;
}
void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vTrace("vgId:%d, vnode not found while notify role", vgId);
return;
}
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role;
dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
cqStart(pVnode->cq);
} else {
cqStop(pVnode->cq);
}
vnodeRelease(pVnode);
}
void vnodeCtrlFlow(int32_t vgId, int32_t level) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
return;
}
if (pVnode->flowctrlLevel != level) {
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
pVnode->flowctrlLevel = level;
}
vnodeRelease(pVnode);
}
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify file synced", vgId);
return 0;
}
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
int32_t code = vnodeReset(pVnode);
vnodeRelease(pVnode);
return code;
}
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId);
return;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode);
}
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
vnodeRelease(pVnode);
return code;
}
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
return -1;
}
int32_t code = 0;
if (pVnode->isCommiting) {
vDebug("vgId:%d, vnode is commiting while get version", vgId);
code = -1;
} else {
*fver = pVnode->fversion;
*wver = pVnode->version;
}
vnodeRelease(pVnode);
return code;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code);
}
......@@ -15,11 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "cJSON.h"
#include "tglobal.h"
#include "tsdb.h"
#include "vnodeInt.h"
#include "vnodeVersion.h"
int32_t vnodeReadVersion(SVnodeObj *pVnode) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "tglobal.h"
#include "vnodeWorker.h"
typedef enum {
VNODE_WORKER_ACTION_CREATE,
VNODE_WORKER_ACTION_DELETE
} EVMWorkerAction;
typedef struct {
int32_t vgId;
int32_t code;
void * rpcHandle;
SVnodeObj *pVnode;
EVMWorkerAction action;
} SVMWorkerMsg;
typedef struct {
pthread_t thread;
int32_t workerId;
} SVMWorker;
typedef struct {
int32_t curNum;
int32_t maxNum;
SVMWorker *worker;
} SVMWorkerPool;
static SVMWorkerPool tsVMWorkerPool;
static taos_qset tsVMWorkerQset;
static taos_queue tsVMWorkerQueue;
static void *vnodeMWorkerFunc(void *param);
static int32_t vnodeStartMWorker() {
tsVMWorkerQueue = taosOpenQueue();
if (tsVMWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
taosAddIntoQset(tsVMWorkerQset, tsVMWorkerQueue, NULL);
for (int32_t i = tsVMWorkerPool.curNum; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, vnodeMWorkerFunc, pWorker) != 0) {
vError("failed to create thread to process vmworker queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
tsVMWorkerPool.curNum = i + 1;
vDebug("vmworker:%d is launched, total:%d", pWorker->workerId, tsVMWorkerPool.maxNum);
}
vDebug("vmworker queue:%p is allocated", tsVMWorkerQueue);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeInitMWorker() {
tsVMWorkerQset = taosOpenQset();
tsVMWorkerPool.maxNum = 1;
tsVMWorkerPool.curNum = 0;
tsVMWorkerPool.worker = calloc(sizeof(SVMWorker), tsVMWorkerPool.maxNum);
if (tsVMWorkerPool.worker == NULL) return -1;
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
pWorker->workerId = i;
vDebug("vmworker:%d is created", i);
}
vDebug("vmworker is initialized, num:%d qset:%p", tsVMWorkerPool.maxNum, tsVMWorkerQset);
return vnodeStartMWorker();
}
static void vnodeStopMWorker() {
vDebug("vmworker queue:%p is freed", tsVMWorkerQueue);
taosCloseQueue(tsVMWorkerQueue);
tsVMWorkerQueue = NULL;
}
void vnodeCleanupMWorker() {
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsVMWorkerQset);
}
vDebug("vmworker:%d is closed", i);
}
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
vDebug("vmworker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
vDebug("vmworker:%d join success", i);
}
vDebug("vmworker is closed, qset:%p", tsVMWorkerQset);
taosCloseQset(tsVMWorkerQset);
tsVMWorkerQset = NULL;
tfree(tsVMWorkerPool.worker);
vnodeStopMWorker();
}
static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) {
SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg));
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID;
pMsg->vgId = vgId;
pMsg->pVnode = pVnode;
pMsg->rpcHandle = rpcHandle;
pMsg->action = action;
int32_t code = taosWriteQitem(tsVMWorkerQueue, TAOS_QTYPE_RPC, pMsg);
if (code == 0) code = TSDB_CODE_DND_ACTION_IN_PROGRESS;
return code;
}
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) {
vTrace("vgId:%d, will open in vmworker", vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle);
}
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) {
vTrace("vgId:%d, will cleanup in vmworker", vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle);
}
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
vTrace("vgId:%d, disposed in vmworker", pMsg->vgId);
vnodeRelease(pMsg->pVnode);
taosFreeQitem(pMsg);
}
static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) {
SRpcMsg rpcRsp = {
.handle = pMsg->rpcHandle,
.code = pMsg->code,
};
rpcSendResponse(&rpcRsp);
vnodeFreeMWorkerMsg(pMsg);
}
static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
pMsg->code = 0;
switch (pMsg->action) {
case VNODE_WORKER_ACTION_CREATE:
pMsg->code = vnodeOpen(pMsg->vgId);
break;
case VNODE_WORKER_ACTION_DELETE:
pMsg->code = vnodeDrop(pMsg->vgId);
break;
default:
break;
}
}
static void *vnodeMWorkerFunc(void *param) {
while (1) {
SVMWorkerMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) {
vDebug("qset:%p, vmworker got no message from qset, exiting", tsVMWorkerQset);
break;
}
vTrace("vgId:%d, action:%d will be processed in vmworker queue", pMsg->vgId, pMsg->action);
vnodeProcessMWorkerMsg(pMsg);
vnodeSendVMWorkerRpcRsp(pMsg);
}
return NULL;
}
......@@ -19,17 +19,9 @@
#include "taoserror.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
#include "ttimer.h"
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "syncInt.h"
#include "tcq.h"
#include "dnode.h"
#include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 10000
......@@ -43,15 +35,19 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
void vnodeInitWriteFp(void) {
int32_t vnodeInitWrite(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
return 0;
}
void vnodeCleanupWrite() {}
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0;
SVnodeObj *pVnode = vparam;
......@@ -68,7 +64,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version);
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) {
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId,
taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version);
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
......@@ -118,7 +114,7 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
if (vnodeInClosingStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
......@@ -132,11 +128,6 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_SUCCESS;
}
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code);
}
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册