提交 cc52d293 编写于 作者: S slguan

fix error while sync sdb

上级 a7e1c7ce
...@@ -25,6 +25,7 @@ void dnodeCleanupMClient(); ...@@ -25,6 +25,7 @@ void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
uint32_t dnodeGetMnodeMasteIp(); uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMpeerInfos(); void * dnodeGetMpeerInfos();
int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,8 +23,6 @@ extern "C" { ...@@ -23,8 +23,6 @@ extern "C" {
int32_t dnodeInitMgmt(); int32_t dnodeInitMgmt();
void dnodeCleanupMgmt(); void dnodeCleanupMgmt();
void dnodeMgmt(SRpcMsg *rpcMsg); void dnodeMgmt(SRpcMsg *rpcMsg);
void dnodeUpdateDnodeId(int32_t dnodeId);
int32_t dnodeGetDnodeId();
void* dnodeGetVnode(int32_t vgId); void* dnodeGetVnode(int32_t vgId);
int32_t dnodeGetVnodeStatus(void *pVnode); int32_t dnodeGetVnodeStatus(void *pVnode);
......
...@@ -21,30 +21,51 @@ ...@@ -21,30 +21,51 @@
#include "trpc.h" #include "trpc.h"
#include "tutil.h" #include "tutil.h"
#include "tsync.h" #include "tsync.h"
#include "ttime.h"
#include "ttimer.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "vnode.h"
#define MPEER_CONTENT_LEN 2000 #define MPEER_CONTENT_LEN 2000
static bool dnodeReadMnodeIpList(); static bool dnodeReadMnodeIpList();
static void dnodeSaveMnodeIpList(); static void dnodeSaveMnodeIpList();
static void dnodeReadDnodeInfo();
static void dnodeUpdateDnodeInfo(int32_t dnodeId);
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void *tsDnodeMClientRpc = NULL; static void *tsDnodeMClientRpc = NULL;
static SRpcIpSet tsMnodeIpList = {0}; static SRpcIpSet tsMnodeIpList = {0};
static SDMNodeInfos tsMnodeInfos = {0}; static SDMNodeInfos tsMnodeInfos = {0};
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static int32_t tsDnodeId = 0;
static char tsDnodeName[TSDB_NODE_NAME_LEN];
int32_t dnodeInitMClient() { int32_t dnodeInitMClient() {
dnodeReadDnodeInfo();
tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
if (!dnodeReadMnodeIpList()) { if (!dnodeReadMnodeIpList()) {
memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos)); memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos));
tsMnodeIpList.port = tsMnodeDnodePort; tsMnodeIpList.port = tsMnodeDnodePort;
tsMnodeIpList.numOfIps = 1; tsMnodeIpList.numOfIps = 1;
tsMnodeIpList.ip[0] = inet_addr(tsMasterIp); tsMnodeIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) { if (strcmp(tsSecondIp, tsMasterIp) != 0) {
tsMnodeIpList.numOfIps = 2; tsMnodeIpList.numOfIps = 2;
tsMnodeIpList.ip[1] = inet_addr(tsSecondIp); tsMnodeIpList.ip[1] = inet_addr(tsSecondIp);
} }
...@@ -57,8 +78,6 @@ int32_t dnodeInitMClient() { ...@@ -57,8 +78,6 @@ int32_t dnodeInitMClient() {
} }
} }
tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
...@@ -79,11 +98,24 @@ int32_t dnodeInitMClient() { ...@@ -79,11 +98,24 @@ int32_t dnodeInitMClient() {
return -1; return -1;
} }
tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dPrint("mnode rpc client is opened"); dPrint("mnode rpc client is opened");
return 0; return 0;
} }
void dnodeCleanupMClient() { void dnodeCleanupMClient() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
if (tsDnodeMClientRpc) { if (tsDnodeMClientRpc) {
rpcClose(tsDnodeMClientRpc); rpcClose(tsDnodeMClientRpc);
tsDnodeMClientRpc = NULL; tsDnodeMClientRpc = NULL;
...@@ -104,6 +136,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { ...@@ -104,6 +136,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code)); dError("status rsp is received, error:%s", tstrerror(pMsg->code));
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return; return;
} }
...@@ -111,6 +144,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -111,6 +144,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
SDMNodeInfos *mpeers = &pStatusRsp->mpeers; SDMNodeInfos *mpeers = &pStatusRsp->mpeers;
if (mpeers->nodeNum <= 0) { if (mpeers->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", mpeers->nodeNum); dError("status msg is invalid, num of ips is %d", mpeers->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return; return;
} }
...@@ -122,14 +156,16 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -122,14 +156,16 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp); mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp);
} }
if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0) { if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0 || tsMnodeInfos.nodeNum == 0) {
memcpy(&tsMnodeIpList, &mgmtIpSet, sizeof(SRpcIpSet)); memcpy(&tsMnodeIpList, &mgmtIpSet, sizeof(SRpcIpSet));
memcpy(&tsMnodeInfos, mpeers, sizeof(SDMNodeInfos)); tsMnodeInfos.inUse = mpeers->inUse;
tsMnodeInfos.nodeNum = mpeers->nodeNum;
dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < mpeers->nodeNum; i++) { for (int32_t i = 0; i < mpeers->nodeNum; i++) {
tsMnodeInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId); tsMnodeInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId);
tsMnodeInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp); tsMnodeInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp);
tsMnodeInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort); tsMnodeInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort);
strcpy(tsMnodeInfos.nodeInfos[i].nodeName, mpeers->nodeInfos[i].nodeName);
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId,
taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort, taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort,
tsMnodeInfos.nodeInfos[i].nodeName); tsMnodeInfos.nodeInfos[i].nodeName);
...@@ -144,7 +180,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -144,7 +180,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
pState->dnodeId = htonl(pState->dnodeId); pState->dnodeId = htonl(pState->dnodeId);
dnodeProcessModuleStatus(pState->moduleStatus); dnodeProcessModuleStatus(pState->moduleStatus);
dnodeUpdateDnodeId(pState->dnodeId); dnodeUpdateDnodeInfo(pState->dnodeId);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
} }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
...@@ -295,3 +332,92 @@ uint32_t dnodeGetMnodeMasteIp() { ...@@ -295,3 +332,92 @@ uint32_t dnodeGetMnodeMasteIp() {
void* dnodeGetMpeerInfos() { void* dnodeGetMpeerInfos() {
return &tsMnodeInfos; return &tsMnodeInfos;
} }
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeId);
pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
pStatus->publicIp = htonl(inet_addr(tsPublicIp));
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode(&rpcMsg);
}
static void dnodeReadDnodeInfo() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "r");
if (!fp) return;
char option[32] = {0};
int32_t value = 0;
int32_t num = 0;
num = fscanf(fp, "%s %d", option, &value);
if (num != 2) return;
if (strcmp(option, "dnodeId") != 0) return;
tsDnodeId = value;;
fclose(fp);
dPrint("read dnodeId:%d successed", tsDnodeId);
}
static void dnodeSaveDnodeInfo() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "w");
if (!fp) return;
fprintf(fp, "dnodeId %d\n", tsDnodeId);
fclose(fp);
dPrint("save dnodeId successed");
}
void dnodeUpdateDnodeInfo(int32_t dnodeId) {
if (tsDnodeId == 0) {
dPrint("dnodeId is set to %d", dnodeId);
tsDnodeId = dnodeId;
dnodeSaveDnodeInfo();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeId;
}
\ No newline at end of file
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
#include "twal.h" #include "twal.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
...@@ -38,52 +36,23 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); ...@@ -38,52 +36,23 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeReadDnodeId();
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static int32_t tsDnodeId = 0;
static char tsDnodeName[TSDB_NODE_NAME_LEN];
int32_t dnodeInitMgmt() { int32_t dnodeInitMgmt() {
dnodeReadDnodeId();
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
int32_t code = dnodeOpenVnodes(); int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupMgmt() { void dnodeCleanupMgmt() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
dnodeCloseVnodes(); dnodeCloseVnodes();
} }
...@@ -213,93 +182,3 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -213,93 +182,3 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
return tsCfgDynamicOptions(pCfg->config); return tsCfgDynamicOptions(pCfg->config);
} }
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeId);
pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
pStatus->publicIp = htonl(inet_addr(tsPublicIp));
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode(&rpcMsg);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static void dnodeReadDnodeId() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "r");
if (!fp) return;
char option[32] = {0};
int32_t value = 0;
int32_t num = 0;
num = fscanf(fp, "%s %d", option, &value);
if (num != 2) return;
if (strcmp(option, "dnodeId") != 0) return;
tsDnodeId = value;;
fclose(fp);
dPrint("read dnodeId:%d successed", tsDnodeId);
}
static void dnodeSaveDnodeId() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "w");
if (!fp) return;
fprintf(fp, "dnodeId %d\n", tsDnodeId);
fclose(fp);
dPrint("save dnodeId successed");
}
void dnodeUpdateDnodeId(int32_t dnodeId) {
if (tsDnodeId == 0) {
dPrint("dnodeId is set to %d", dnodeId);
tsDnodeId = dnodeId;
dnodeSaveDnodeId();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeId;
}
\ No newline at end of file
...@@ -37,6 +37,7 @@ int32_t clusterInit(); ...@@ -37,6 +37,7 @@ int32_t clusterInit();
void clusterCleanUp(); void clusterCleanUp();
char* clusterGetDnodeStatusStr(int32_t dnodeStatus); char* clusterGetDnodeStatusStr(int32_t dnodeStatus);
bool clusterCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType); bool clusterCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType);
void clusterMonitorDnodeModule();
int32_t clusterInitDnodes(); int32_t clusterInitDnodes();
void clusterCleanupDnodes(); void clusterCleanupDnodes();
......
...@@ -21,8 +21,8 @@ extern "C" { ...@@ -21,8 +21,8 @@ extern "C" {
#endif #endif
typedef enum { typedef enum {
SDB_TABLE_MNODE = 0, SDB_TABLE_DNODE = 0,
SDB_TABLE_DNODE = 1, SDB_TABLE_MNODE = 1,
SDB_TABLE_ACCOUNT = 2, SDB_TABLE_ACCOUNT = 2,
SDB_TABLE_USER = 3, SDB_TABLE_USER = 3,
SDB_TABLE_DB = 4, SDB_TABLE_DB = 4,
...@@ -90,7 +90,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow); ...@@ -90,7 +90,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
void sdbIncRef(void *thandle, void *pRow); void sdbIncRef(void *thandle, void *pRow);
void sdbDecRef(void *thandle, void *pRow); void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle); int64_t sdbGetNumOfRows(void *handle);
int64_t sdbGetId(void *handle); int32_t sdbGetId(void *handle);
uint64_t sdbGetVersion(); uint64_t sdbGetVersion();
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -208,6 +208,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -208,6 +208,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId); mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TAOS_DN_STATUS_READY; pDnode->status = TAOS_DN_STATUS_READY;
balanceNotify(); balanceNotify();
clusterMonitorDnodeModule();
} }
clusterReleaseDnode(pDnode); clusterReleaseDnode(pDnode);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "hashint.h" #include "hashint.h"
#include "hashstr.h" #include "hashstr.h"
...@@ -67,7 +68,7 @@ static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIn ...@@ -67,7 +68,7 @@ static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIn
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
uint64_t sdbGetVersion() { return tsSdbObj->version; } uint64_t sdbGetVersion() { return tsSdbObj->version; }
int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
static char *sdbGetActionStr(int32_t action) { static char *sdbGetActionStr(int32_t action) {
...@@ -116,7 +117,7 @@ int32_t sdbInit() { ...@@ -116,7 +117,7 @@ int32_t sdbInit() {
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t numOfTables = 0; int32_t numOfTables = 0;
for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) { for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) {
SSdbTable *pTable = sdbGetTableFromId(tableId); SSdbTable *pTable = sdbGetTableFromId(tableId);
if (pTable == NULL) continue; if (pTable == NULL) continue;
if (pTable->restoredFp) { if (pTable->restoredFp) {
...@@ -275,8 +276,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ ...@@ -275,8 +276,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_
} }
walFsync(tsSdbObj->wal); walFsync(tsSdbObj->wal);
free(pHead); taosFreeQitem(pHead);
return code; return code;
} }
...@@ -390,7 +390,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) { ...@@ -390,7 +390,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
if (pOper->type == SDB_OPER_GLOBAL) { if (pOper->type == SDB_OPER_GLOBAL) {
int32_t size = sizeof(SWalHead) + pTable->maxRowSize; int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
SWalHead *pHead = calloc(1, size); SWalHead *pHead = taosAllocateQitem(size);
pHead->version = 0; pHead->version = 0;
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
...@@ -435,7 +435,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) { ...@@ -435,7 +435,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
} }
int32_t size = sizeof(SWalHead) + rowSize; int32_t size = sizeof(SWalHead) + rowSize;
SWalHead *pHead = calloc(1, size); SWalHead *pHead = taosAllocateQitem(size);
pHead->version = 0; pHead->version = 0;
pHead->len = rowSize; pHead->len = rowSize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
...@@ -463,7 +463,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { ...@@ -463,7 +463,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
if (pOper->type == SDB_OPER_GLOBAL) { if (pOper->type == SDB_OPER_GLOBAL) {
int32_t size = sizeof(SWalHead) + pTable->maxRowSize; int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
SWalHead *pHead = calloc(1, size); SWalHead *pHead = taosAllocateQitem(size);
pHead->version = 0; pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
......
...@@ -160,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -160,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
pVnode->sync = syncStart(&syncInfo);; pVnode->sync = syncStart(&syncInfo);
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册