提交 107518ee 编写于 作者: S slguan

[TD-17]

上级 5fb1dd5e
...@@ -30,24 +30,24 @@ ...@@ -30,24 +30,24 @@
#include "dnodeWrite.h" #include "dnodeWrite.h"
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int8_t dirty; EVnodeStatus status; // status: master, slave, notready, deleting
int8_t status; // status: master, slave, notready, deleting int64_t version;
int64_t version; void * wworker;
void *wworker; void * rworker;
void *rworker; void * wal;
void *wal; void * tsdb;
void *tsdb; void * replica;
void *replica; void * events;
void *events; void * cq; // continuous query
void *cq; // continuous query
} SVnodeObj; } SVnodeObj;
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes(); static void dnodeCleanupVnodes();
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir); static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir);
static void dnodeCleanupVnode(SVnodeObj *pVnode); static void dnodeCleanupVnode(SVnodeObj *pVnode);
static void dnodeDoCleanupVnode(SVnodeObj *pVnode);
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg);
static void dnodeDropVnode(SVnodeObj *pVnode); static void dnodeDropVnode(SVnodeObj *pVnode);
static void dnodeDoDropVnode(SVnodeObj *pVnode); static void dnodeDoDropVnode(SVnodeObj *pVnode);
...@@ -89,9 +89,14 @@ int32_t dnodeInitMgmt() { ...@@ -89,9 +89,14 @@ int32_t dnodeInitMgmt() {
dError("failed to init dnode timer"); dError("failed to init dnode timer");
return -1; return -1;
} }
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
return dnodeOpenVnodes(); taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupMgmt() { void dnodeCleanupMgmt() {
...@@ -140,7 +145,9 @@ void *dnodeGetVnode(int32_t vgId) { ...@@ -140,7 +145,9 @@ void *dnodeGetVnode(int32_t vgId) {
return NULL; return NULL;
} }
atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p, vgroup:%d, get vnode, refCount:%d", pVnode->vgId, refCount);
return pVnode; return pVnode;
} }
...@@ -166,10 +173,24 @@ void *dnodeGetVnodeTsdb(void *pVnode) { ...@@ -166,10 +173,24 @@ void *dnodeGetVnodeTsdb(void *pVnode) {
void dnodeReleaseVnode(void *pVnodeRaw) { void dnodeReleaseVnode(void *pVnodeRaw) {
SVnodeObj *pVnode = pVnodeRaw; SVnodeObj *pVnode = pVnodeRaw;
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count == 0 && pVnode->dirty) { int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
dnodeDoDropVnode(pVnode); if (pVnode->status == TSDB_VN_STATUS_DELETING) {
if (refCount <= 0) {
dPrint("pVnode:%p, vgroup:%d, drop vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
dnodeDoDropVnode(pVnode);
} else {
dTrace("pVnode:%p, vgroup:%d, vnode will be dropped until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
}
} else if (pVnode->status == TSDB_VN_STATUS_CLOSING) {
if (refCount <= 0) {
dPrint("pVnode:%p, vgroup:%d, cleanup vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
dnodeDoCleanupVnode(pVnode);
} else {
dTrace("pVnode:%p, vgroup:%d, vnode will cleanup until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
}
} else {
dTrace("pVnode:%p, vgroup:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
} }
} }
...@@ -210,42 +231,42 @@ static void dnodeCleanupVnodes() { ...@@ -210,42 +231,42 @@ static void dnodeCleanupVnodes() {
} }
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
char tsdbDir[TSDB_FILENAME_LEN]; char tsdbDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/tsdb", rootDir); sprintf(tsdbDir, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(tsdbDir); void *pTsdb = tsdbOpenRepo(tsdbDir);
if (pTsdb == NULL) { if (pTsdb == NULL) {
dError("failed to open tsdb in vnode:%d %s, reason:%s", vnode, tsdbDir, tstrerror(terrno)); dError("pVnode:%p, vgroup:%d, failed to open tsdb in %s, reason:%s", pVnode, pVnode->vgId, tsdbDir, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno; return terrno;
} }
//STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); pVnode->wal = NULL;
pVnode->tsdb = pTsdb;
SVnodeObj vnodeObj = {0}; pVnode->replica = NULL;
vnodeObj.vgId = vnode;//tsdbInfo->tsdbCfg.tsdbId; pVnode->events = NULL;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; pVnode->cq = NULL;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode); pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode); pVnode->rworker = dnodeAllocateReadWorker(pVnode);
dTrace("open vnode:%d in %s", pVnode->vgId, rootDir); //TODO: jude status while replca is not null
if (pVnode->replica == NULL) {
pVnode->status = TSDB_VN_STATUS_MASTER;
}
dTrace("pVnode:%p, vgroup:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeCleanupVnode(SVnodeObj *pVnode) { static void dnodeDoCleanupVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY; dTrace("pVnode:%p, vgroup:%d, cleanup vnode", pVnode, pVnode->vgId);
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
// wait refcount
}
// remove replica // remove replica
// remove read queue // remove read queue
...@@ -263,8 +284,11 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { ...@@ -263,8 +284,11 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
} }
}
dTrace("cleanup vnode:%d", pVnode->vgId); static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_CLOSING;
dnodeReleaseVnode(pVnode);
} }
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
...@@ -311,7 +335,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -311,7 +335,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
SVnodeObj vnodeObj = {0}; SVnodeObj vnodeObj = {0};
vnodeObj.vgId = pVnodeCfg->cfg.vgId; vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_CREATING;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
...@@ -323,32 +347,27 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -323,32 +347,27 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode); pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode); pVnode->rworker = dnodeAllocateReadWorker(pVnode);
if (pVnode->replica == NULL) {
pVnode->status = TSDB_VN_STATUS_MASTER;
}
dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId); dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeDoDropVnode(SVnodeObj *pVnode) { static void dnodeDoDropVnode(SVnodeObj *pVnode) {
if (pVnode->tsdb) { dnodeDoCleanupVnode(pVnode);
tsdbDropRepo(pVnode->tsdb);
pVnode->tsdb = NULL;
}
dnodeCleanupVnode(pVnode);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnode->vgId);
dPrint("pVnode:%p, vgroup:%d, drop file:%s from disk", pVnode, pVnode->vgId, rootDir);
// rmdir(rootDir);
} }
static void dnodeDropVnode(SVnodeObj *pVnode) { static void dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY; pVnode->status = TSDB_VN_STATUS_DELETING;
pVnode->dirty = true; dnodeReleaseVnode(pVnode);
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
dTrace("vgroup:%d, vnode will be dropped until refcount:%d is 0", pVnode->vgId, count);
return;
}
dnodeDoDropVnode(pVnode);
} }
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -359,7 +378,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -359,7 +378,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("vgroup:%d, start to create vnode:%d in dnode", pCreate->cfg.vgId, pCreate->cfg.vgId); dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
...@@ -378,13 +397,13 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -378,13 +397,13 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SMDDropVnodeMsg *pDrop = rpcMsg->pCont; SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
dTrace("vgroup:%d, start to drop vnode in dnode", pDrop->vgId);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId);
if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
dPrint("pVnode:%p, vgroup:%d, start to drop vnode in dnode", pVnodeObj, pDrop->vgId);
dnodeDropVnode(pVnodeObj); dnodeDropVnode(pVnodeObj);
rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
} else { } else {
dTrace("vgroup:%d, failed drop vnode in dnode, vgroup not exist", pDrop->vgId);
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
} }
...@@ -403,8 +422,10 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -403,8 +422,10 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
dPrint("pVnode:%p, vgroup:%d, start to alter vnode in dnode", pVnodeObj, pCreate->cfg.vgId);
rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
} else { } else {
dTrace("vgroup:%d, alter vnode msg received, start to create vnode", pCreate->cfg.vgId);
rpcRsp.code = dnodeCreateVnode(pCreate);; rpcRsp.code = dnodeCreateVnode(pCreate);;
} }
...@@ -432,7 +453,8 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -432,7 +453,8 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
static void dnodeBuildVloadMsg(char *pNode, void * param) { static void dnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = (SVnodeObj *) pNode; SVnodeObj *pVnode = (SVnodeObj *) pNode;
if (pVnode->dirty) return; dPrint("===> pVnode:%p, vgroup:%d status:%s", pVnode, pVnode->vgId, taosGetVnodeStatusStr(pVnode->status));
if (pVnode->status == TSDB_VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param; SDMStatusMsg *pStatus = param;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
...@@ -528,4 +550,3 @@ void dnodeUpdateDnodeId(int32_t dnodeId) { ...@@ -528,4 +550,3 @@ void dnodeUpdateDnodeId(int32_t dnodeId) {
dnodeSaveDnodeId(); dnodeSaveDnodeId();
} }
} }
...@@ -79,6 +79,12 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { ...@@ -79,6 +79,12 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
return; return;
} }
if (pMsg->pCont == NULL) {
rspMsg.code = TSDB_CODE_INVALID_MSG_LEN;
rpcSendResponse(&rspMsg);
return;
}
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
......
...@@ -32,11 +32,11 @@ ...@@ -32,11 +32,11 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg); static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg);
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void *tsMgmtDServerRpc; static void *tsMgmtDServerRpc;
static void *tsMgmtDServerQhandle = NULL;
int32_t mgmtInitDServer() { int32_t mgmtInitDServer() {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
...@@ -56,11 +56,18 @@ int32_t mgmtInitDServer() { ...@@ -56,11 +56,18 @@ int32_t mgmtInitDServer() {
return -1; return -1;
} }
tsMgmtDServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
mPrint("server connection to dnode is opened"); mPrint("server connection to dnode is opened");
return 0; return 0;
} }
void mgmtCleanupDServer() { void mgmtCleanupDServer() {
if (tsMgmtDServerQhandle) {
taosCleanUpScheduler(tsMgmtDServerQhandle);
tsMgmtDServerQhandle = NULL;
}
if (tsMgmtDServerRpc) { if (tsMgmtDServerRpc) {
rpcClose(tsMgmtDServerRpc); rpcClose(tsMgmtDServerRpc);
tsMgmtDServerRpc = NULL; tsMgmtDServerRpc = NULL;
...@@ -72,14 +79,34 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -72,14 +79,34 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeMsgFp[msgType] = fp; mgmtProcessDnodeMsgFp[msgType] = fp;
} }
static void mgmtProcessDServerRequest(SSchedMsg *sched) {
SRpcMsg *pMsg = sched->msg;
(*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg);
rpcFreeCont(pMsg->pCont);
free(pMsg);
}
static void mgmtAddToDServerQueue(SRpcMsg *pMsg) {
SSchedMsg schedMsg;
schedMsg.msg = pMsg;
schedMsg.fp = mgmtProcessDServerRequest;
taosScheduleTask(tsMgmtDServerQhandle, &schedMsg);
}
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return;
}
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg); SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
mgmtAddToDServerQueue(pMsg);
} else { } else {
mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]); mError("%s is not processed in dserver", taosMsg[rpcMsg->msgType]);
rpcFreeCont(rpcMsg->pCont);
} }
rpcFreeCont(rpcMsg->pCont);
} }
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
......
...@@ -144,19 +144,24 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -144,19 +144,24 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDMStatusMsg *pStatus = rpcMsg->pCont; SDMStatusMsg *pStatus = rpcMsg->pCont;
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp);
pStatus->publicIp = htonl(pStatus->publicIp);
pStatus->lastReboot = htonl(pStatus->lastReboot);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp)); pDnode = mgmtGetDnodeByIp(pStatus->privateIp);
if (pDnode == NULL) { if (pDnode == NULL) {
mTrace("dnode not created, privateIp:%s", taosIpStr(htonl(pStatus->privateIp))); mTrace("dnode not created, privateIp:%s", taosIpStr(pStatus->privateIp));
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return; return;
} }
} else { } else {
pDnode = mgmtGetDnode(pStatus->dnodeId); pDnode = mgmtGetDnode(pStatus->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("dnode:%d, not exist, privateIp:%s", taosIpStr(pStatus->dnodeId), pStatus->dnodeName); mError("dnode:%d, not exist, privateIp:%s", pStatus->dnodeId, taosIpStr(pStatus->privateIp));
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return; return;
} }
...@@ -169,16 +174,16 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -169,16 +174,16 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return ; return ;
} }
pDnode->privateIp = htonl(pStatus->privateIp); pDnode->privateIp = pStatus->privateIp;
pDnode->publicIp = htonl(pStatus->publicIp); pDnode->publicIp = pStatus->publicIp;
pDnode->lastReboot = htonl(pStatus->lastReboot); pDnode->lastReboot = pStatus->lastReboot;
pDnode->numOfCores = htons(pStatus->numOfCores); pDnode->numOfCores = pStatus->numOfCores;
pDnode->diskAvailable = pStatus->diskAvailable; pDnode->diskAvailable = pStatus->diskAvailable;
pDnode->alternativeRole = pStatus->alternativeRole; pDnode->alternativeRole = pStatus->alternativeRole;
pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); pDnode->numOfTotalVnodes = pStatus->numOfTotalVnodes;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
...@@ -191,7 +196,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -191,7 +196,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId); SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} }
} }
......
...@@ -131,6 +131,10 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { ...@@ -131,6 +131,10 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
} }
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
return;
}
if (!mgmtInServerStatus()) { if (!mgmtInServerStatus()) {
mgmtProcessMsgWhileNotReady(rpcMsg); mgmtProcessMsgWhileNotReady(rpcMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -221,14 +225,17 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -221,14 +225,17 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
.handle = pMsg->thandle, .handle = pMsg->thandle,
.pCont = pShowRsp, .pCont = pShowRsp,
.contLen = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns, .contLen = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns,
.code = code, .code = code
.msgType = 0
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else { } else {
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, taosGetShowTypeStr(pShowMsg->type), tstrerror(code)); mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, taosGetShowTypeStr(pShowMsg->type), tstrerror(code));
mgmtFreeQhandle(pShow); mgmtFreeQhandle(pShow);
rpcFreeCont(pShowRsp); SRpcMsg rpcRsp = {
.handle = pMsg->thandle,
.code = code
};
rpcSendResponse(&rpcRsp);
} }
} }
......
...@@ -490,6 +490,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) { ...@@ -490,6 +490,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
while (1) { while (1) {
pLastNode = pNode;
pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser); pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser);
if (pUser == NULL) break; if (pUser == NULL) break;
......
...@@ -602,7 +602,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { ...@@ -602,7 +602,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
} }
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mTrace("drop vnode msg is received"); mTrace("drop vnode rsp is received");
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SQueuedMsg *queueMsg = rpcMsg->handle;
......
...@@ -41,14 +41,13 @@ enum _TSDB_DB_STATUS { ...@@ -41,14 +41,13 @@ enum _TSDB_DB_STATUS {
}; };
typedef enum _TSDB_VN_STATUS { typedef enum _TSDB_VN_STATUS {
TSDB_VN_STATUS_OFFLINE, TSDB_VN_STATUS_NOT_READY,
TSDB_VN_STATUS_CREATING,
TSDB_VN_STATUS_UNSYNCED, TSDB_VN_STATUS_UNSYNCED,
TSDB_VN_STATUS_SLAVE, TSDB_VN_STATUS_SLAVE,
TSDB_VN_STATUS_MASTER, TSDB_VN_STATUS_MASTER,
TSDB_VN_STATUS_CREATING,
TSDB_VN_STATUS_CLOSING, TSDB_VN_STATUS_CLOSING,
TSDB_VN_STATUS_DELETING, TSDB_VN_STATUS_DELETING,
TSDB_VN_STATUS_NOT_READY
} EVnodeStatus; } EVnodeStatus;
enum _TSDB_VN_SYNC_STATUS { enum _TSDB_VN_SYNC_STATUS {
......
...@@ -40,11 +40,11 @@ char* taosGetDbStatusStr(int32_t dbStatus) { ...@@ -40,11 +40,11 @@ char* taosGetDbStatusStr(int32_t dbStatus) {
char* taosGetVnodeStatusStr(int32_t vnodeStatus) { char* taosGetVnodeStatusStr(int32_t vnodeStatus) {
switch (vnodeStatus) { switch (vnodeStatus) {
case TSDB_VN_STATUS_OFFLINE: return "offline"; case TSDB_VN_STATUS_NOT_READY:return "not_ready";
case TSDB_VN_STATUS_CREATING: return "creating";
case TSDB_VN_STATUS_UNSYNCED: return "unsynced"; case TSDB_VN_STATUS_UNSYNCED: return "unsynced";
case TSDB_VN_STATUS_SLAVE: return "slave"; case TSDB_VN_STATUS_SLAVE: return "slave";
case TSDB_VN_STATUS_MASTER: return "master"; case TSDB_VN_STATUS_MASTER: return "master";
case TSDB_VN_STATUS_CREATING: return "creating";
case TSDB_VN_STATUS_CLOSING: return "closing"; case TSDB_VN_STATUS_CLOSING: return "closing";
case TSDB_VN_STATUS_DELETING: return "deleting"; case TSDB_VN_STATUS_DELETING: return "deleting";
default: return "undefined"; default: return "undefined";
......
...@@ -583,7 +583,8 @@ char *taosIpStr(uint32_t ipInt) { ...@@ -583,7 +583,8 @@ char *taosIpStr(uint32_t ipInt) {
static int ipStrIndex = 0; static int ipStrIndex = 0;
char *ipStr = ipStrArray[(ipStrIndex++) % 3]; char *ipStr = ipStrArray[(ipStrIndex++) % 3];
sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); //sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
return ipStr; return ipStr;
} }
......
...@@ -90,9 +90,9 @@ echo "logDir $LOG_DIR" >> $TAOS_CFG ...@@ -90,9 +90,9 @@ echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "publicIp $NODE_IP" >> $TAOS_CFG echo "publicIp $NODE_IP" >> $TAOS_CFG
echo "internalIp $NODE_IP" >> $TAOS_CFG echo "internalIp $NODE_IP" >> $TAOS_CFG
echo "privateIp $NODE_IP" >> $TAOS_CFG echo "privateIp $NODE_IP" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 199" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 199" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册