提交 a1c8ec36 编写于 作者: S slguan

[TD-52] fix bug in sdb sync

上级 f3b76d1a
...@@ -23,8 +23,6 @@ extern "C" { ...@@ -23,8 +23,6 @@ extern "C" {
int32_t dnodeInitMClient(); int32_t dnodeInitMClient();
void dnodeCleanupMClient(); void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void * dnodeGetMnodeList();
int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,11 +22,11 @@ ...@@ -22,11 +22,11 @@
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "vnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "vnode.h"
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes(); static void dnodeCloseVnodes();
......
...@@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { ...@@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus); dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus);
tsModule[TSDB_MOD_MGMT].enable = true; tsModule[TSDB_MOD_MGMT].enable = true;
dnodeSetModuleStatus(TSDB_MOD_MGMT); dnodeSetModuleStatus(TSDB_MOD_MGMT);
(*tsModule[TSDB_MOD_MGMT].stopFp)(); (*tsModule[TSDB_MOD_MGMT].startFp)();
} }
if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) { if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) {
......
...@@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); ...@@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy(); bool dnodeIsFirstDeploy();
uint32_t dnodeGetMnodeMasteIp(); uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMnodeList();
int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -584,6 +584,7 @@ typedef struct { ...@@ -584,6 +584,7 @@ typedef struct {
char dnodeName[TSDB_NODE_NAME_LEN + 1]; char dnodeName[TSDB_NODE_NAME_LEN + 1];
uint32_t privateIp; uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file uint16_t numOfTotalVnodes; // from config file
uint16_t openVnodes; uint16_t openVnodes;
......
...@@ -31,7 +31,6 @@ int32_t mgmtInitDnodes(); ...@@ -31,7 +31,6 @@ int32_t mgmtInitDnodes();
void mgmtCleanupDnodes(); void mgmtCleanupDnodes();
char* mgmtGetDnodeStatusStr(int32_t dnodeStatus); char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtMonitorDnodeModule(); void mgmtMonitorDnodeModule();
int32_t mgmtGetDnodesNum(); int32_t mgmtGetDnodesNum();
......
...@@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { ...@@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) {
static int32_t mgmtDnodeActionRestored() { static int32_t mgmtDnodeActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
uint32_t ip = inet_addr(tsPrivateIp); uint32_t ip = inet_addr(tsPrivateIp);
mgmtCreateDnode(ip); mgmtCreateDnode(ip);
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
...@@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp); pStatus->privateIp = htonl(pStatus->privateIp);
pStatus->publicIp = htonl(pStatus->publicIp); pStatus->publicIp = htonl(pStatus->publicIp);
pStatus->moduleStatus = htonl(pStatus->moduleStatus);
pStatus->lastReboot = htonl(pStatus->lastReboot); pStatus->lastReboot = htonl(pStatus->lastReboot);
pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
...@@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->diskAvailable = pStatus->diskAvailable; pDnode->diskAvailable = pStatus->diskAvailable;
pDnode->alternativeRole = pStatus->alternativeRole; pDnode->alternativeRole = pStatus->alternativeRole;
pDnode->totalVnodes = pStatus->numOfTotalVnodes; pDnode->totalVnodes = pStatus->numOfTotalVnodes;
pDnode->moduleStatus = pStatus->moduleStatus;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
...@@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtGetMnodeList(&pRsp->mnodes); mgmtGetMnodeList(&pRsp->mnodes);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); pRsp->dnodeState.moduleStatus = htonl((int32_t)pDnode->isMgmt);
pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000); pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000);
pRsp->dnodeState.numOfVnodes = 0; pRsp->dnodeState.numOfVnodes = 0;
...@@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) { ...@@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) {
pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM; pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM;
sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1); sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1);
if (pDnode->privateIp == inet_addr(tsMasterIp)) {
pDnode->moduleStatus |= (1 << TSDB_MOD_MGMT);
}
SSdbOperDesc oper = { SSdbOperDesc oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb, .table = tsDnodeSdb,
...@@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
return numOfRows; return numOfRows;
} }
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
uint32_t status = pDnode->moduleStatus & (1 << moduleType); uint32_t status = pDnode->moduleStatus & (1 << moduleType);
return status > 0; return status > 0;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "treplica.h" #include "treplica.h"
#include "tgrant.h" #include "tgrant.h"
#include "ttimer.h" #include "ttimer.h"
#include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -100,6 +101,10 @@ int32_t mgmtStartSystem() { ...@@ -100,6 +101,10 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (replicaInit() < 0) {
mError("failed to init replica")
}
if (mgmtInitDClient() < 0) { if (mgmtInitDClient() < 0) {
return -1; return -1;
} }
...@@ -108,10 +113,6 @@ int32_t mgmtStartSystem() { ...@@ -108,10 +113,6 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (replicaInit() < 0) {
mError("failed to init dnode balance")
}
grantReset(TSDB_GRANT_ALL, 0); grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true; tsMgmtIsRunning = true;
......
...@@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { ...@@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pOper->pObj;
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
pDnode->isMgmt = false;
mgmtReleaseDnode(pDnode);
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId); mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s ...@@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
uint64_t sdbGetVersion() { return tsSdbObj->version; }
int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
uint64_t sdbGetVersion() {
if (tsSdbObj)
return tsSdbObj->version;
else
return 0;
}
static char *sdbGetActionStr(int32_t action) { static char *sdbGetActionStr(int32_t action) {
switch (action) { switch (action) {
case SDB_ACTION_INSERT: case SDB_ACTION_INSERT:
...@@ -147,10 +153,6 @@ void sdbCleanUp() { ...@@ -147,10 +153,6 @@ void sdbCleanUp() {
} }
} }
SSdbObject *sdbGetObj() {
return tsSdbObj;
}
void sdbIncRef(void *handle, void *pRow) { void sdbIncRef(void *handle, void *pRow) {
if (pRow) { if (pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { ...@@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
} }
static int32_t mgmtUserActionRestored() { static int32_t mgmtUserActionRestored() {
if (strcmp(tsMasterIp, tsPrivateIp) == 0) { if (dnodeIsFirstDeploy()) {
SAcctObj *pAcct = mgmtGetAcct("root"); SAcctObj *pAcct = mgmtGetAcct("root");
mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "monitor", tsInternalPass);
......
...@@ -110,12 +110,7 @@ short tsDaysPerFile = 10; ...@@ -110,12 +110,7 @@ short tsDaysPerFile = 10;
int tsDaysToKeep = 3650; int tsDaysToKeep = 3650;
int tsReplications = TSDB_REPLICA_MIN_NUM; int tsReplications = TSDB_REPLICA_MIN_NUM;
#ifdef _MPEER
int tsNumOfMPeers = 3; int tsNumOfMPeers = 3;
#else
int tsNumOfMPeers = 1;
#endif
int tsMaxShellConns = 2000; int tsMaxShellConns = 2000;
int tsMaxTables = 100000; int tsMaxTables = 100000;
...@@ -556,7 +551,7 @@ static void doInitGlobalConfig() { ...@@ -556,7 +551,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
32, 4096, 0, TSDB_CFG_UTYPE_NONE); 32, 4096, 0, TSDB_CFG_UTYPE_NONE);
#ifdef _MPEER #ifdef _SYNC
tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
1, 3, 0, TSDB_CFG_UTYPE_NONE); 1, 3, 0, TSDB_CFG_UTYPE_NONE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册