未验证 提交 ed596901 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1516 from taosdata/refactor/cluster

Refactor/cluster
...@@ -48,7 +48,7 @@ int32_t mgmtInitMnodes() { ...@@ -48,7 +48,7 @@ int32_t mgmtInitMnodes() {
void mgmtCleanupMnodes() {} void mgmtCleanupMnodes() {}
bool mgmtInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; } bool mgmtInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; }
bool mgmtIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; } bool mgmtIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; }
bool mgmtCheckRedirect(void *handle) { return false; } bool mgmtCheckRedirect(void *thandle) { return false; }
static int32_t mgmtGetMnodesNum() { static int32_t mgmtGetMnodesNum() {
return 1; return 1;
...@@ -117,7 +117,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -117,7 +117,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
} }
pShow->numOfRows = mgmtGetDnodesNum(); pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
......
...@@ -40,7 +40,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon ...@@ -40,7 +40,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg); static void mgmtProcessMsgWhileNotReady(SRpcMsg *rpcMsg);
...@@ -135,7 +135,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -135,7 +135,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
} }
if (mgmtCheckRedirect(rpcMsg->handle)) { if (mgmtCheckRedirect(rpcMsg->handle)) {
// send resp in redirect func // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -165,7 +165,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -165,7 +165,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { if (mgmtCheckMsgReadOnly(pMsg)) {
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg); (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
mgmtFreeQueuedMsg(pMsg); mgmtFreeQueuedMsg(pMsg);
} else { } else {
...@@ -185,7 +185,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -185,7 +185,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
return; return;
} }
if (!tsMgmtShowMetaFp[pShowMsg->type]) { if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type)); mError("show type:%s is not support", taosGetShowTypeStr(pShowMsg->type));
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
return; return;
...@@ -299,22 +299,13 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { ...@@ -299,22 +299,13 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
} }
static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
//SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont;
//mgmtSaveQueryStreamList(pHBMsg);
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) { if (pHBRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
SRpcConnInfo connInfo; if (pMsg->usePublicIp) {
if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("conn:%p is already released while process heart beat msg", pMsg->thandle);
return;
}
if (connInfo.serverIp == tsPublicIpInt) {
mgmtGetMnodePublicIpList(&pHBRsp->ipList); mgmtGetMnodePublicIpList(&pHBRsp->ipList);
} else { } else {
mgmtGetMnodePrivateIpList(&pHBRsp->ipList); mgmtGetMnodePrivateIpList(&pHBRsp->ipList);
...@@ -424,10 +415,10 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { ...@@ -424,10 +415,10 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
SCMUseDbMsg *pUseDbMsg = pMsg->pCont; SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
// todo check for priority of current user // todo check for priority of current user
SDbObj* pDbObj = mgmtGetDb(pUseDbMsg->db); pMsg->pDb = mgmtGetDb(pUseDbMsg->db);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pDbObj == NULL) { if (pMsg->pDb == NULL) {
code = TSDB_CODE_INVALID_DB; code = TSDB_CODE_INVALID_DB;
} }
...@@ -438,26 +429,29 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { ...@@ -438,26 +429,29 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
/** /**
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/ */
static bool mgmtCheckMeterMetaMsgType(void *pMsg) { static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) {
SCMTableInfoMsg *pInfo = (SCMTableInfoMsg *) pMsg; SCMTableInfoMsg *pInfo = pMsg->pCont;
int16_t autoCreate = htons(pInfo->createFlag); pMsg->pTable = mgmtGetTable(pInfo->tableId);
STableInfo *pTable = mgmtGetTable(pInfo->tableId); if (pMsg->pTable != NULL) return true;
// If table does not exists and autoCreate flag is set, we add the handler into task queue // If table does not exists and autoCreate flag is set, we add the handler into task queue
bool addIntoTranQueue = (pTable == NULL && autoCreate == 1); int16_t autoCreate = htons(pInfo->createFlag);
if (addIntoTranQueue) { if (autoCreate == 1) {
mTrace("table:%s auto created task added", pInfo->tableId); mTrace("table:%s auto created task added", pInfo->tableId);
return false;
} }
mgmtDecTableRef(pTable); return true;
return addIntoTranQueue;
} }
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) {
if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) {
type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_RETRIEVE || return mgmtCheckTableMetaMsgReadOnly(pMsg);
type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META || }
type == TSDB_MSG_TYPE_CM_CONNECT) {
if (pMsg->msgType == TSDB_MSG_TYPE_CM_STABLE_VGROUP || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_CM_SHOW || pMsg->msgType == TSDB_MSG_TYPE_CM_TABLES_META ||
pMsg->msgType == TSDB_MSG_TYPE_CM_CONNECT) {
return true; return true;
} }
......
此差异已折叠。
################################# #################################
run general/user/basic1.sim run general/user/basic1.sim
run general/show/dnodes.sim
run general/db/basic1.sim run general/db/basic1.sim
run general/db/basic2.sim
run general/db/basic3.sim
run general/db/basic4.sim
run general/db/basic5.sim
run general/table/basic1.sim run general/table/basic1.sim
run general/table/basic2.sim
run general/table/basic3.sim
################################## ##################################
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册