提交 58607175 编写于 作者: J Jeff Tao

optimize the code in dnodeMgmt, so it can close vnode one by one when dnode cleans up

remove the RPC warnings
上级 67acb1f3
......@@ -32,6 +32,7 @@
#include "vnode.h"
static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes();
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
......@@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() {
return -1;
}
if ( vnodeInitModule() != TSDB_CODE_SUCCESS) {
return -1;
}
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
......@@ -88,7 +85,7 @@ void dnodeCleanupMgmt() {
tsDnodeTmr = NULL;
}
vnodeCleanupModule();
dnodeCloseVnodes();
}
void dnodeMgmt(SRpcMsg *pMsg) {
......@@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
}
static int32_t dnodeOpenVnodes() {
static int dnodeGetVnodeList(int32_t vnodeList[]) {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS;
......@@ -122,20 +119,44 @@ static int32_t dnodeOpenVnodes() {
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
char vnodeDir[TSDB_FILENAME_LEN * 3];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
int32_t code = vnodeOpen(vnode, vnodeDir);
if (code == 0) {
vnodeList[numOfVnodes] = vnode;
numOfVnodes++;
}
}
}
closedir(dir);
dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes);
return numOfVnodes;
}
static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3];
int failed = 0;
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
}
free(vnodeList);
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i)
vnodeClose(vnodeList[i]);
free(vnodeList);
dPrint("total vnodes:%d are all closed", numOfVnodes);
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
......
......@@ -25,13 +25,10 @@ typedef struct {
void *rsp;
} SRspRet;
int32_t vnodeInitModule();
void vnodeCleanupModule();
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vnode, char *rootDir);
int32_t vnodeClose(void *pVnode);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId);
......
......@@ -263,7 +263,6 @@ void *rpcOpen(SRpcInit *pInit) {
}
if (pRpc->connType == TAOS_CONN_SERVER) {
// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label);
......@@ -535,8 +534,7 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0};
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
// taosDeleteStrHash(pRpc->hash, hashstr);
// taosHashRemove(pRpc->hash, hashstr, size);
taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL;
......@@ -588,7 +586,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated
// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn;
if (pConn) return pConn;
......@@ -621,7 +618,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->localPort = (pRpc->localPort + pRpc->index);
}
// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
......@@ -834,13 +830,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->inType) {
// if there are pending request, notify the app
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
// (*(pRpc->cfp))(&rpcMsg);
(*(pRpc->cfp))(&rpcMsg);
*/
}
rpcCloseConn(pConn);
......@@ -1163,13 +1161,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
// (*(pRpc->cfp))(&rpcMsg);
(*(pRpc->cfp))(&rpcMsg);
*/
}
rpcCloseConn(pConn);
} else {
......
......@@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
int32_t vnodeInitModule() {
static int tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() {
vnodeInitWriteFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list");
return -1;
}
return 0;
}
typedef void (*CleanupFp)(char *);
void vnodeCleanupModule() {
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
taosCleanUpIntHash(tsDnodeVnodesHash);
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
......@@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
......@@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->status = VN_STATUS_READY;
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
tsOpennedVnodes++;
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(void *param) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) return 0;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
pVnode->status = VN_STATUS_CLOSING;
......@@ -183,6 +182,12 @@ void vnodeRelease(void *pVnodeRaw) {
}
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
tsOpennedVnodes--;
if (tsOpennedVnodes <= 0) {
taosCleanUpIntHash(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT;
}
}
void *vnodeGetVnode(int32_t vgId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册