提交 e8b77d3b 编写于 作者: S slguan

[TD-73] add some scripts

上级 2b83bfae
......@@ -25,6 +25,8 @@ int32_t mgmtInitDnodes();
void mgmtCleanUpDnodes();
int32_t mgmtGetDnodesNum();
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
void mgmtIncDnodeRef(SDnodeObj *pDnode);
void mgmtDecDnodeRef(SDnodeObj *pDnode);
SDnodeObj* mgmtGetDnode(int32_t dnodeId);
SDnodeObj* mgmtGetDnodeByIp(uint32_t ip);
......
......@@ -29,6 +29,7 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
float vnodeUsage = 1.0;
while (1) {
mgmtDecDnodeRef(pDnode);
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break;
if (pDnode->numOfTotalVnodes <= 0) continue;
......
......@@ -42,6 +42,8 @@ extern int32_t clusterInit();
extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum();
extern void * clusterGetNextDnode(void *pNode, void **pDnode);
extern void clusterIncDnodeRef(SDnodeObj *pDnode);
extern void clusterDecDnodeRef(SDnodeObj *pDnode);
extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip);
#ifndef _CLUSTER
......@@ -118,6 +120,18 @@ int32_t mgmtGetDnodesNum() {
#endif
}
void mgmtIncDnodeRef(SDnodeObj *pDnode) {
#ifdef _CLUSTER
return clusterIncDnodeRef(pDnode);
#endif
}
void mgmtDecDnodeRef(SDnodeObj *pDnode) {
#ifdef _CLUSTER
return clusterDecDnodeRef(pDnode);
#endif
}
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) {
#ifdef _CLUSTER
return clusterGetNextDnode(pNode, pDnode);
......@@ -183,6 +197,13 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
uint32_t version = htonl(pStatus->version);
if (version != tsVersion) {
mError("status msg version:%d not equal with mnode:%d", version, tsVersion);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_VERSION);
return ;
}
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(pStatus->privateIp);
......@@ -199,13 +220,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return;
}
}
uint32_t version = htonl(pStatus->version);
if (version != tsVersion) {
mError("dnode:%d, status msg version:%d not equal with mnode:%d", pDnode->dnodeId, version, tsVersion);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_VERSION);
return ;
}
pDnode->privateIp = pStatus->privateIp;
pDnode->publicIp = pStatus->publicIp;
......@@ -241,6 +255,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtStartBalanceTimer(200);
}
mgmtDecDnodeRef(pDnode);
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess);
SDMStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
......@@ -340,6 +356,8 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
mgmtDecUserRef(pUser);
return 0;
}
......@@ -351,6 +369,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
char ipstr[32];
while (numOfRows < rows) {
mgmtDecDnodeRef(pDnode);
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
......@@ -454,6 +473,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
mgmtDecUserRef(pUser);
return 0;
}
......@@ -466,6 +486,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
char ipstr[20];
while (numOfRows < rows) {
mgmtDecDnodeRef(pDnode);
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
......@@ -540,6 +561,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
mgmtDecUserRef(pUser);
return 0;
}
......@@ -654,6 +676,8 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtDecDnodeRef(pDnode);
mgmtDecUserRef(pUser);
return 0;
}
......
......@@ -120,6 +120,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
mgmtDecUserRef(pUser);
return 0;
}
......@@ -167,6 +168,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
......
......@@ -58,6 +58,7 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
if (pDnode) {
atomic_sub_fetch_32(&pDnode->openVnodes, 1);
}
mgmtDecDnodeRef(pDnode);
}
tfree(pOper->pObj);
......@@ -96,6 +97,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode);
}
mgmtAddVgroupIntoDb(pVgroup);
......@@ -295,10 +297,10 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
return TSDB_CODE_INVALID_TABLE_ID;
}
mgmtDecTableRef(pTable);
pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
mgmtDecTableRef(pTable);
mgmtDecVgroupRef(pVgroup);
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else {
SVgObj *pVgroup = pDb->pHead;
......@@ -350,6 +352,8 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->pNode = pVgroup;
}
mgmtDecDbRef(pDb);
return 0;
}
......@@ -359,6 +363,7 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId);
return "null";
}
mgmtDecDnodeRef(pDnode);
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) {
return "offline";
......@@ -438,6 +443,8 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
pShow->numOfReads += numOfRows;
mgmtDecDbRef(pDb);
return numOfRows;
}
......@@ -634,13 +641,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
code = TSDB_CODE_SDB_ERROR;
}
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle;
newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg);
mgmtAddToShellQueue(newMsg);
queueMsg->pCont = NULL;
......@@ -660,6 +661,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return;
}
mgmtDecDnodeRef(pDnode);
SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId);
if (pVgroup == NULL) {
......@@ -667,6 +669,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return;
}
mgmtDecVgroupRef(pVgroup);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
......@@ -682,6 +685,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
SVgObj *pVgroup = NULL;
while (1) {
mgmtDecVgroupRef(pVgroup);
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL) break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册