提交 c95b1eec 编写于 作者: S slguan

fix bug while balance

上级 112ee168
...@@ -33,7 +33,6 @@ static int32_t dnodeOpenVnodes(); ...@@ -33,7 +33,6 @@ static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes(); static void dnodeCloseVnodes();
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
...@@ -41,7 +40,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); ...@@ -41,7 +40,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() { int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
...@@ -146,7 +144,14 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -146,7 +144,14 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp); pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
} }
return vnodeCreate(pCreate); void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) {
int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode);
return code;
} else {
return vnodeCreate(pCreate);
}
} }
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
...@@ -156,28 +161,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -156,28 +161,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeDrop(pDrop->vgId); return vnodeDrop(pDrop->vgId);
} }
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
}
return 0;
}
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SMDAlterStreamMsg *pStream = pCont; // SMDAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid); // pStream->uid = htobe64(pStream->uid);
......
...@@ -33,7 +33,6 @@ int32_t dnodeInitMnode() { ...@@ -33,7 +33,6 @@ int32_t dnodeInitMnode() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt;
......
...@@ -48,14 +48,12 @@ extern "C" { ...@@ -48,14 +48,12 @@ extern "C" {
#define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16 #define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16
#define TSDB_MSG_TYPE_MD_DROP_VNODE 17 #define TSDB_MSG_TYPE_MD_DROP_VNODE 17
#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 #define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18
#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 #define TSDB_MSG_TYPE_MD_DROP_STABLE 19
#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 #define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 20
#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 #define TSDB_MSG_TYPE_MD_ALTER_STREAM 21
#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 #define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 22
#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 #define TSDB_MSG_TYPE_MD_CONFIG_DNODE 23
#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24 #define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 24
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26
// message from client to mnode // message from client to mnode
#define TSDB_MSG_TYPE_CM_CONNECT 31 #define TSDB_MSG_TYPE_CM_CONNECT 31
...@@ -512,6 +510,7 @@ typedef struct { ...@@ -512,6 +510,7 @@ typedef struct {
uint8_t status; uint8_t status;
uint8_t role; uint8_t role;
uint8_t accessState; uint8_t accessState;
uint8_t replica;
uint8_t reserved[5]; uint8_t reserved[5];
} SVnodeLoad; } SVnodeLoad;
......
...@@ -38,6 +38,7 @@ typedef struct { ...@@ -38,6 +38,7 @@ typedef struct {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
......
...@@ -47,6 +47,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); ...@@ -47,6 +47,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
......
...@@ -335,6 +335,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -335,6 +335,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
} else {
mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
...@@ -349,11 +351,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -349,11 +351,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
} else { } else {
mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload); mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload);
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->totalStorage = htobe64(pVload->totalStorage);
pVgroup->compStorage = htobe64(pVload->compStorage);
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
}
mgmtDecVgroupRef(pVgroup); mgmtDecVgroupRef(pVgroup);
} }
} }
......
...@@ -44,9 +44,7 @@ static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -44,9 +44,7 @@ static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, vo
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ; static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ;
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) { static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) {
SVgObj *pVgroup = pOper->pObj; SVgObj *pVgroup = pOper->pObj;
...@@ -124,9 +122,25 @@ static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) { ...@@ -124,9 +122,25 @@ static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) {
static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
SVgObj *pNew = pOper->pObj; SVgObj *pNew = pOper->pObj;
SVgObj *pVgroup = mgmtGetVgroup(pNew->vgId); SVgObj *pVgroup = mgmtGetVgroup(pNew->vgId);
if (pVgroup != pNew) { if (pVgroup != pNew) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = pVgroup->vnodeGid[i].pDnode;
if (pDnode != NULL) {
atomic_sub_fetch_32(&pDnode->openVnodes, 1);
}
}
memcpy(pVgroup, pNew, pOper->rowSize); memcpy(pVgroup, pNew, pOper->rowSize);
free(pNew); free(pNew);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
pVgroup->vnodeGid[i].pDnode = pDnode;
if (pDnode != NULL) {
atomic_add_fetch_32(&pDnode->openVnodes, 1);
}
}
} }
int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);
...@@ -232,6 +246,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) { ...@@ -232,6 +246,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
} }
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) { void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) {
bool dnodeExist = false;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) { if (pVgid->pDnode == pDnode) {
...@@ -239,9 +254,29 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo ...@@ -239,9 +254,29 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
if (pVload->role == TAOS_SYNC_ROLE_MASTER) { if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i; pVgroup->inUse = i;
} }
dnodeExist = true;
break; break;
} }
} }
if (!dnodeExist) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mError("vgroup:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId);
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
return;
}
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->totalStorage = htobe64(pVload->totalStorage);
pVgroup->compStorage = htobe64(pVload->compStorage);
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
}
if (pVload->replica != pVgroup->numOfVnodes) {
mError("dnode:%d, vgroup:%d replica:%d not match with mgmt:%d", pDnode->dnodeId, pVload->vgId, pVload->replica,
pVgroup->numOfVnodes);
mgmtSendCreateVgroupMsg(pVgroup, NULL);
}
} }
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
...@@ -521,7 +556,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -521,7 +556,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SMDVnodeDesc *pNodes = pVnode->nodes; SMDVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
SDnodeObj *pDnode = pVgroup->vnodeGid[0].pDnode; SDnodeObj *pDnode = pVgroup->vnodeGid[j].pDnode;
if (pDnode != NULL) { if (pDnode != NULL) {
pNodes[j].nodeId = htonl(pDnode->dnodeId); pNodes[j].nodeId = htonl(pDnode->dnodeId);
pNodes[j].nodeIp = htonl(pDnode->privateIp); pNodes[j].nodeIp = htonl(pDnode->privateIp);
......
...@@ -170,6 +170,8 @@ char *taosIpStr(uint32_t ipInt); ...@@ -170,6 +170,8 @@ char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr); uint32_t ip2uint(const char *const ip_addr);
void taosRemoveDir(char *rootDir);
#define TAOS_ALLOC_MODE_DEFAULT 0 #define TAOS_ALLOC_MODE_DEFAULT 0
#define TAOS_ALLOC_MODE_RANDOM_FAIL 1 #define TAOS_ALLOC_MODE_RANDOM_FAIL 1
#define TAOS_ALLOC_MODE_DETECT_LEAK 2 #define TAOS_ALLOC_MODE_DETECT_LEAK 2
......
...@@ -662,4 +662,28 @@ void tzfree(void *ptr) { ...@@ -662,4 +662,28 @@ void tzfree(void *ptr) {
if (ptr) { if (ptr) {
free((void *)((char *)ptr - sizeof(size_t))); free((void *)((char *)ptr - sizeof(size_t)));
} }
}
void taosRemoveDir(char *rootDir) {
DIR *dir = opendir(rootDir);
if (dir == NULL) return;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
char filename[1024];
snprintf(filename, 1023, "%s/%s", rootDir, de->d_name);
if (de->d_type & DT_DIR) {
taosRemoveDir(filename);
} else {
remove(filename);
uPrint("file:%s is removed", filename);
}
}
closedir(dir);
rmdir(rootDir);
uPrint("dir:%s is removed", rootDir);
} }
\ No newline at end of file
...@@ -135,6 +135,39 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -135,6 +135,39 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = param;
int32_t code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
return code;
}
code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return code;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode, pVnode->vgId,
tstrerror(code));
return code;
}
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) {
dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode, pVnode->vgId,
tstrerror(code));
return code;
}
dTrace("pVnode:%p vgId:%d, vnode is altered", pVnode, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeOpen(int32_t vnode, char *rootDir) { int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
pthread_once(&vnodeModuleInit, vnodeInit); pthread_once(&vnodeModuleInit, vnodeInit);
...@@ -159,7 +192,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -159,7 +192,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, &pVnode->walCfg); pVnode->wal = walOpen(temp, &pVnode->walCfg);
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId; syncInfo.vgId = pVnode->vgId;
...@@ -172,10 +205,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -172,10 +205,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
...@@ -233,7 +266,9 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -233,7 +266,9 @@ void vnodeRelease(void *pVnodeRaw) {
pVnode->wqueue = NULL; pVnode->wqueue = NULL;
if (pVnode->status == TAOS_VN_STATUS_DELETING) { if (pVnode->status == TAOS_VN_STATUS_DELETING) {
// remove the whole directory char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
taosRemoveDir(rootDir);
} }
free(pVnode); free(pVnode);
...@@ -252,7 +287,8 @@ void *vnodeGetVnode(int32_t vgId) { ...@@ -252,7 +287,8 @@ void *vnodeGetVnode(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId); SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID; terrno = TSDB_CODE_INVALID_VGROUP_ID;
assert(false); dError("vgId:%d not exist");
return NULL;
} }
return *ppVnode; return *ppVnode;
...@@ -298,6 +334,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -298,6 +334,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
pLoad->role = pVnode->role; pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
} }
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
......
...@@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG ...@@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG
echo "dDebugFlag 199" >> $TAOS_CFG echo "dDebugFlag 199" >> $TAOS_CFG
echo "mDebugFlag 199" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG
echo "sdbDebugFlag 199" >> $TAOS_CFG echo "sdbDebugFlag 199" >> $TAOS_CFG
echo "rpcDebugFlag 131" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 131" >> $TAOS_CFG echo "httpDebugFlag 131" >> $TAOS_CFG
......
...@@ -25,15 +25,15 @@ system sh/cfg.sh -n dnode2 -c mgmtEqualVnodeNum -v 4 ...@@ -25,15 +25,15 @@ system sh/cfg.sh -n dnode2 -c mgmtEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mgmtEqualVnodeNum -v 4 system sh/cfg.sh -n dnode3 -c mgmtEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mgmtEqualVnodeNum -v 4 system sh/cfg.sh -n dnode4 -c mgmtEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c clog -v 1 system sh/cfg.sh -n dnode1 -c clog -v 2
system sh/cfg.sh -n dnode2 -c clog -v 1 system sh/cfg.sh -n dnode2 -c clog -v 2
system sh/cfg.sh -n dnode3 -c clog -v 1 system sh/cfg.sh -n dnode3 -c clog -v 2
system sh/cfg.sh -n dnode4 -c clog -v 1 system sh/cfg.sh -n dnode4 -c clog -v 2
system sh/cfg.sh -n dnode1 -c clog -v 1 system sh/cfg.sh -n dnode1 -c clog -v 2
system sh/cfg.sh -n dnode2 -c clog -v 1 system sh/cfg.sh -n dnode2 -c clog -v 2
system sh/cfg.sh -n dnode3 -c clog -v 1 system sh/cfg.sh -n dnode3 -c clog -v 2
system sh/cfg.sh -n dnode4 -c clog -v 1 system sh/cfg.sh -n dnode4 -c clog -v 2
print ========== step1 print ========== step1
system sh/exec_up.sh -n dnode1 -s start system sh/exec_up.sh -n dnode1 -s start
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册