提交 1ddbe3ca 编写于 作者: A Alex Duan

feat(mnode): modify server flow

上级 2d44f233
...@@ -438,6 +438,11 @@ do { \ ...@@ -438,6 +438,11 @@ do { \
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
//define show cluster status and show db.status
#define SHOW_STATUS_NOT_AVAILABLE 0
#define SHOW_STATUS_AVAILABLE 1
#define SHOW_STATUS_HALF_AVAILABLE 2
typedef enum { typedef enum {
TAOS_QTYPE_RPC = 0, TAOS_QTYPE_RPC = 0,
TAOS_QTYPE_FWD = 1, TAOS_QTYPE_FWD = 1,
......
...@@ -60,7 +60,8 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -60,7 +60,8 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
// status // status
static int32_t mnodeGetStatusMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetStatusMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStatus(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveStatusDb(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeRetrieveStatusCluster(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg); static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
...@@ -241,11 +242,11 @@ int32_t mnodeInitVgroups() { ...@@ -241,11 +242,11 @@ int32_t mnodeInitVgroups() {
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
// cluster status // cluster status
mnodeAddShowMetaHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeGetStatusMeta); mnodeAddShowMetaHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeGetStatusMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeRetrieveStatus); mnodeAddShowRetrieveHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeRetrieveStatusCluster);
mnodeAddShowFreeIterHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeCancelGetNextVgroup); mnodeAddShowFreeIterHandle(TSDB_MGMT_STATUS_CLUSTER, mnodeCancelGetNextVgroup);
// db status // db status
mnodeAddShowMetaHandle(TSDB_MGMT_STATUS_DB, mnodeGetStatusMeta); mnodeAddShowMetaHandle(TSDB_MGMT_STATUS_DB, mnodeGetStatusMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_STATUS_DB, mnodeRetrieveStatus); mnodeAddShowRetrieveHandle(TSDB_MGMT_STATUS_DB, mnodeRetrieveStatusDb);
mnodeAddShowFreeIterHandle(TSDB_MGMT_STATUS_DB, mnodeCancelGetNextVgroup); mnodeAddShowFreeIterHandle(TSDB_MGMT_STATUS_DB, mnodeCancelGetNextVgroup);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp); mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
...@@ -793,164 +794,84 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -793,164 +794,84 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
} }
static int32_t mnodeGetStatusMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetStatusMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
mnodeDecDbRef(pDb);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
pShow->bytes[0] = sizeof(int32_t);
pSchema[0].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[0].name, "status");
pSchema[0].bytes = htons(pShow->bytes[0]);
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "onlines");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->maxReplica = 1;
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup != NULL) {
pShow->maxReplica = pVgroup->numOfVnodes > pShow->maxReplica ? pVgroup->numOfVnodes : pShow->maxReplica;
}
}
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "compacting");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pDb->numOfVgroups;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeDecDbRef(pDb);
return 0; return 0;
} }
static int32_t mnodeRetrieveStatus(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mnodeRetrieveStatus(SShowObj *pShow, char *data, int32_t rows, void *pConn, SDbObj* pDb) {
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t cols = 0; int32_t nAvailble = 0;
char * pWrite; int32_t nUnAvailble = 0;
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
mnodeDecDbRef(pDb);
return 0;
}
// get status
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextVgroup(pShow->pIter, &pVgroup); pShow->pIter = mnodeGetNextVgroup(pShow->pIter, &pVgroup);
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
if (pVgroup->pDb != pDb) { // if pDb is not null
if ( pDb && pVgroup->pDb != pDb) {
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
continue; continue;
} }
cols = 0; // status
if(pVgroup->status != TAOS_VG_STATUS_READY ) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; // vgroup is not ready
*(int32_t *) pWrite = pVgroup->vgId; nUnAvailble ++;
cols++; continue;
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *) pWrite = pVgroup->numOfTables;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* status = vgroupStatus[pVgroup->status];
STR_TO_VARSTR(pWrite, status);
cols++;
int32_t onlineVnodes = 0; // check master
bool master = false;
for (int32_t i = 0; i < pShow->maxReplica; ++i) { for (int32_t i = 0; i < pShow->maxReplica; ++i) {
if (pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_SLAVE || pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_MASTER) { if (pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_MASTER) {
onlineVnodes++; master = true;
break;
}
} }
if (master)
nAvailble++;
else
nUnAvailble++;
} }
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; // set col data
*(int32_t *)pWrite = onlineVnodes; int32_t status = 0;
cols++; if (nAvailble + nUnAvailble == 0 || nUnAvailble == 0) {
status = SHOW_STATUS_AVAILABLE;
} else if (nAvailble > 0 && nUnAvailble > 0) {
status = SHOW_STATUS_HALF_AVAILABLE;
} else {
status = SHOW_STATUS_NOT_AVAILABLE;
}
for (int32_t i = 0; i < pShow->maxReplica; ++i) { *(int32_t* )data = status;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pShow->numOfReads = 1;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
SDnodeObj * pDnode = pVgroup->vnodeGid[i].pDnode; return 1;
const char *role = "NULL"; }
if (pDnode != NULL) {
role = syncRole[pVgroup->vnodeGid[i].role];
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; static int32_t mnodeRetrieveStatusCluster(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]); return mnodeRetrieveStatus(pShow, data, rows, pConn, NULL);
cols++; }
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; static int32_t mnodeRetrieveStatusDb(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
*(int8_t *)pWrite = pVgroup->compact;
cols++;
mnodeDecVgroupRef(pVgroup);
numOfRows++;
}
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
pShow->numOfReads += numOfRows; if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
mnodeDecDbRef(pDb);
return 0;
}
int32_t numOfRows = mnodeRetrieveStatus(pShow, data, rows, pConn, pDb);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
return numOfRows; return numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册