提交 e63637fc 编写于 作者: S Shengliang Guan

TD-10431 rename some functions

上级 6ee88322
......@@ -24,8 +24,8 @@ extern "C" {
int32_t mndInitMnode();
void mndCleanupMnode();
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect);
void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
#ifdef __cplusplus
}
......
......@@ -24,9 +24,8 @@ extern "C" {
int32_t mndInitSync();
void mndCleanupSync();
int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData);
bool mnodeIsMaster();
bool mndIsMaster();
int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData);
#ifdef __cplusplus
}
......
......@@ -20,5 +20,5 @@
int32_t mndInitMnode() { return 0; }
void mndCleanupMnode() {}
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {}
\ No newline at end of file
void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {}
void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {}
\ No newline at end of file
......@@ -21,10 +21,10 @@
int32_t mndInitSync() { return 0; }
void mndCleanupSync() {}
int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) {
int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData) {
trnApply(pData, pData, 0);
free(pData);
return 0;
}
bool mnodeIsMaster() { return true; }
\ No newline at end of file
bool mndIsMaster() { return true; }
\ No newline at end of file
......@@ -36,9 +36,9 @@ static struct {
char email[TSDB_FQDN_LEN];
} tsTelem;
static void mnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
static void mnodeCloseObject(SBufferWriter* bw) {
static void mndCloseObject(SBufferWriter* bw) {
size_t len = tbufTell(bw);
if (tbufGetData(bw, false)[len - 1] == ',') {
tbufWriteCharAt(bw, len - 1, '}');
......@@ -64,14 +64,14 @@ static void closeArray(SBufferWriter* bw) {
}
#endif
static void mnodeWriteString(SBufferWriter* bw, const char* str) {
static void mndWriteString(SBufferWriter* bw, const char* str) {
tbufWriteChar(bw, '"');
tbufWrite(bw, str, strlen(str));
tbufWriteChar(bw, '"');
}
static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
mnodeWriteString(bw, k);
static void mndAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
mndWriteString(bw, k);
tbufWriteChar(bw, ':');
char buf[32];
sprintf(buf, "%" PRId64, v);
......@@ -79,14 +79,14 @@ static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
tbufWriteChar(bw, ',');
}
static void mnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) {
mnodeWriteString(bw, k);
static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) {
mndWriteString(bw, k);
tbufWriteChar(bw, ':');
mnodeWriteString(bw, v);
mndWriteString(bw, v);
tbufWriteChar(bw, ',');
}
static void mnodeAddCpuInfo(SBufferWriter* bw) {
static void mndAddCpuInfo(SBufferWriter* bw) {
char* line = NULL;
size_t size = 0;
int32_t done = 0;
......@@ -100,11 +100,11 @@ static void mnodeAddCpuInfo(SBufferWriter* bw) {
line[size - 1] = '\0';
if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
const char* v = strchr(line, ':') + 2;
mnodeAddStringField(bw, "cpuModel", v);
mndAddStringField(bw, "cpuModel", v);
done |= 1;
} else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) {
const char* v = strchr(line, ':') + 2;
mnodeWriteString(bw, "numOfCpu");
mndWriteString(bw, "numOfCpu");
tbufWriteChar(bw, ':');
tbufWrite(bw, v, strlen(v));
tbufWriteChar(bw, ',');
......@@ -116,7 +116,7 @@ static void mnodeAddCpuInfo(SBufferWriter* bw) {
fclose(fp);
}
static void mnodeAddOsInfo(SBufferWriter* bw) {
static void mndAddOsInfo(SBufferWriter* bw) {
char* line = NULL;
size_t size = 0;
......@@ -133,7 +133,7 @@ static void mnodeAddOsInfo(SBufferWriter* bw) {
p++;
line[size - 2] = 0;
}
mnodeAddStringField(bw, "os", p);
mndAddStringField(bw, "os", p);
break;
}
}
......@@ -142,7 +142,7 @@ static void mnodeAddOsInfo(SBufferWriter* bw) {
fclose(fp);
}
static void mnodeAddMemoryInfo(SBufferWriter* bw) {
static void mndAddMemoryInfo(SBufferWriter* bw) {
char* line = NULL;
size_t size = 0;
......@@ -156,7 +156,7 @@ static void mnodeAddMemoryInfo(SBufferWriter* bw) {
if (strncmp(line, "MemTotal", 8) == 0) {
const char* p = strchr(line, ':') + 1;
while (*p == ' ') p++;
mnodeAddStringField(bw, "memory", p);
mndAddStringField(bw, "memory", p);
break;
}
}
......@@ -165,32 +165,32 @@ static void mnodeAddMemoryInfo(SBufferWriter* bw) {
fclose(fp);
}
static void mnodeAddVersionInfo(SBufferWriter* bw) {
mnodeAddStringField(bw, "version", version);
mnodeAddStringField(bw, "buildInfo", buildinfo);
mnodeAddStringField(bw, "gitInfo", gitinfo);
mnodeAddStringField(bw, "email", tsTelem.email);
static void mndAddVersionInfo(SBufferWriter* bw) {
mndAddStringField(bw, "version", version);
mndAddStringField(bw, "buildInfo", buildinfo);
mndAddStringField(bw, "gitInfo", gitinfo);
mndAddStringField(bw, "email", tsTelem.email);
}
static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
static void mndAddRuntimeInfo(SBufferWriter* bw) {
SMnodeLoad load = {0};
if (mndGetLoad(NULL, &load) != 0) {
return;
}
mnodeAddIntField(bw, "numOfDnode", load.numOfDnode);
mnodeAddIntField(bw, "numOfMnode", load.numOfMnode);
mnodeAddIntField(bw, "numOfVgroup", load.numOfVgroup);
mnodeAddIntField(bw, "numOfDatabase", load.numOfDatabase);
mnodeAddIntField(bw, "numOfSuperTable", load.numOfSuperTable);
mnodeAddIntField(bw, "numOfChildTable", load.numOfChildTable);
mnodeAddIntField(bw, "numOfColumn", load.numOfColumn);
mnodeAddIntField(bw, "numOfPoint", load.totalPoints);
mnodeAddIntField(bw, "totalStorage", load.totalStorage);
mnodeAddIntField(bw, "compStorage", load.compStorage);
mndAddIntField(bw, "numOfDnode", load.numOfDnode);
mndAddIntField(bw, "numOfMnode", load.numOfMnode);
mndAddIntField(bw, "numOfVgroup", load.numOfVgroup);
mndAddIntField(bw, "numOfDatabase", load.numOfDatabase);
mndAddIntField(bw, "numOfSuperTable", load.numOfSuperTable);
mndAddIntField(bw, "numOfChildTable", load.numOfChildTable);
mndAddIntField(bw, "numOfColumn", load.numOfColumn);
mndAddIntField(bw, "numOfPoint", load.totalPoints);
mndAddIntField(bw, "totalStorage", load.totalStorage);
mndAddIntField(bw, "compStorage", load.compStorage);
}
static void mnodeSendTelemetryReport() {
static void mndSendTelemetryReport() {
char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) {
......@@ -208,15 +208,15 @@ static void mnodeSendTelemetryReport() {
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false);
mnodeBeginObject(&bw);
mnodeAddStringField(&bw, "instanceId", clusterIdStr);
mnodeAddIntField(&bw, "reportVersion", 1);
mnodeAddOsInfo(&bw);
mnodeAddCpuInfo(&bw);
mnodeAddMemoryInfo(&bw);
mnodeAddVersionInfo(&bw);
mnodeAddRuntimeInfo(&bw);
mnodeCloseObject(&bw);
mndBeginObject(&bw);
mndAddStringField(&bw, "instanceId", clusterIdStr);
mndAddIntField(&bw, "reportVersion", 1);
mndAddOsInfo(&bw);
mndAddCpuInfo(&bw);
mndAddMemoryInfo(&bw);
mndAddVersionInfo(&bw);
mndAddRuntimeInfo(&bw);
mndCloseObject(&bw);
const char* header =
"POST /report HTTP/1.1\n"
......@@ -240,12 +240,12 @@ static void mnodeSendTelemetryReport() {
taosCloseSocket(fd);
}
static void* mnodeTelemThreadFp(void* param) {
static void* mndTelemThreadFp(void* param) {
struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("mnode-telem");
setThreadName("mnd-telem");
while (!tsTelem.exit) {
int32_t r = 0;
......@@ -256,8 +256,8 @@ static void* mnodeTelemThreadFp(void* param) {
if (r == 0) break;
if (r != ETIMEDOUT) continue;
if (mnodeIsMaster()) {
mnodeSendTelemetryReport();
if (mndIsMaster()) {
mndSendTelemetryReport();
}
end.tv_sec += REPORT_INTERVAL;
}
......@@ -265,7 +265,7 @@ static void* mnodeTelemThreadFp(void* param) {
return NULL;
}
static void mnodeGetEmail(char* filepath) {
static void mndGetEmail(char* filepath) {
int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) {
return;
......@@ -287,19 +287,19 @@ int32_t mndInitTelem() {
pthread_cond_init(&tsTelem.cond, NULL);
tsTelem.email[0] = 0;
mnodeGetEmail("/usr/local/taos/email");
mndGetEmail("/usr/local/taos/email");
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsTelem.thread, &attr, mnodeTelemThreadFp, NULL);
int32_t code = pthread_create(&tsTelem.thread, &attr, mndTelemThreadFp, NULL);
pthread_attr_destroy(&attr);
if (code != 0) {
mTrace("failed to create telemetry thread since :%s", strerror(code));
}
mInfo("mnode telemetry is initialized");
mInfo("mnd telemetry is initialized");
return 0;
}
......
......@@ -15,14 +15,12 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "os.h"
#include "tglobal.h"
#include "tkey.h"
#include "mndTrans.h"
#define SDB_USER_VER 1
static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) {
static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj));
if (pRaw == NULL) return NULL;
......@@ -38,7 +36,7 @@ static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) {
return pRaw;
}
static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) {
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
......@@ -62,7 +60,7 @@ static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) {
return pRow;
}
static int32_t mnodeUserActionInsert(SUserObj *pUser) {
static int32_t mndUserActionInsert(SUserObj *pUser) {
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->prohibitDbHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -78,7 +76,7 @@ static int32_t mnodeUserActionInsert(SUserObj *pUser) {
return 0;
}
static int32_t mnodeUserActionDelete(SUserObj *pUser) {
static int32_t mndUserActionDelete(SUserObj *pUser) {
if (pUser->prohibitDbHash) {
taosHashCleanup(pUser->prohibitDbHash);
pUser->prohibitDbHash = NULL;
......@@ -92,14 +90,14 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) {
return 0;
}
static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) {
static int32_t mndUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) {
SUserObj tObj;
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
memcpy(pDstUser, pSrcUser, len);
return 0;
}
static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
static int32_t mndCreateDefaultUser(char *acct, char *user, char *pass) {
SUserObj userObj = {0};
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
......@@ -111,30 +109,26 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
userObj.rootAuth = 1;
}
SSdbRaw *pRaw = mnodeUserActionEncode(&userObj);
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pRaw);
}
static int32_t mnodeCreateDefaultUsers() {
if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
static int32_t mndCreateDefaultUsers() {
if (mndCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass) != 0) {
return -1;
}
if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass) != 0) {
if (mndCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
return 0;
}
static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
static int32_t mndCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) {
SUserObj userObj = {0};
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
......@@ -146,7 +140,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1;
SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) {
mError("failed to append redo log since %s", terrstr());
trnDrop(pTrans);
......@@ -154,7 +148,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj);
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) {
mError("failed to append undo log since %s", terrstr());
trnDrop(pTrans);
......@@ -162,7 +156,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj);
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) {
mError("failed to append commit log since %s", terrstr());
trnDrop(pTrans);
......@@ -170,7 +164,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (trnPrepare(pTrans, mnodeSyncPropose) != 0) {
if (trnPrepare(pTrans, mndSyncPropose) != 0) {
trnDrop(pTrans);
return -1;
}
......@@ -179,7 +173,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
return 0;
}
static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
if (pCreate->user[0] == 0) {
......@@ -209,7 +203,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return -1;
}
int32_t code = mnodeCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg);
int32_t code = mndCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg);
sdbRelease(pOperUser);
if (code != 0) {
......@@ -223,15 +217,15 @@ static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
int32_t mndInitUser() {
SSdbTable table = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mnodeCreateDefaultUsers,
.encodeFp = (SdbEncodeFp)mnodeUserActionEncode,
.decodeFp = (SdbDecodeFp)mnodeUserActionDecode,
.insertFp = (SdbInsertFp)mnodeUserActionInsert,
.updateFp = (SdbUpdateFp)mnodeUserActionUpdate,
.deleteFp = (SdbDeleteFp)mnodeUserActionDelete};
.deployFp = (SdbDeployFp)mndCreateDefaultUsers,
.encodeFp = (SdbEncodeFp)mndUserActionEncode,
.decodeFp = (SdbDecodeFp)mndUserActionDecode,
.insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete};
sdbSetTable(table);
mndSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg);
mndSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg);
return 0;
}
......
......@@ -278,28 +278,44 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
int32_t code = 0;
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1);
if (!mnodeIsMaster(pMnode)) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
mndCleanupMsg(pMsg);
return;
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_RPC_END;
}
int32_t msgType = pMsg->rpcMsg.msgType;
MndMsgFp fp = pMnode->msgFp[msgType];
if (fp == NULL) {
mError("RPC %p, req:%s is not processed", pMsg->rpcMsg.handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
mndCleanupMsg(pMsg);
return;
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MSG_NOT_PROCESSED;
goto PROCESS_RPC_END;
}
int32_t code = (*fp)(pMnode, pMsg);
code = (*fp)(pMnode, pMsg);
if (code != 0) {
mError("RPC %p, req:%s processed error since %s", pMsg->rpcMsg.handle, taosMsg[msgType], tstrerror(code));
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], tstrerror(code));
goto PROCESS_RPC_END;
}
PROCESS_RPC_END:
if (isReq) {
if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
} else if (code != 0) {
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rspMsg);
} else {
}
}
mndCleanupMsg(pMsg);
......@@ -318,139 +334,3 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}
#if 0
static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_WRITE_REQ_END;
}
if (!mnodeIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_WRITE_REQ_END;
}
if (tsMworker.writeMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_WRITE_REQ_END;
}
code = (*tsMworker.writeMsgFp[msgType])(pMsg);
PROCESS_WRITE_REQ_END:
mndSendRsp(pMsg, code);
}
static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_READ_REQ_END;
}
if (!mnodeIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
if (!epSet) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto PROCESS_READ_REQ_END;
}
mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
epSet->numOfEps, epSet->inUse);
code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_READ_REQ_END;
}
if (tsMworker.readMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_READ_REQ_END;
}
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
code = (*tsMworker.readMsgFp[msgType])(pMsg);
PROCESS_READ_REQ_END:
mndSendRsp(pMsg, code);
}
static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
int32_t code = 0;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
goto PROCESS_PEER_REQ_END;
}
if (!mnodeIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForPeer(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
code = TSDB_CODE_RPC_REDIRECT;
goto PROCESS_PEER_REQ_END;
}
if (tsMworker.peerReqFp[msgType] == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
goto PROCESS_PEER_REQ_END;
}
code = (*tsMworker.peerReqFp[msgType])(pMsg);
PROCESS_PEER_REQ_END:
mndSendRsp(pMsg, code);
}
static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
int32_t msgType = pMsg->rpcMsg.msgType;
SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
if (!mnodeIsMaster()) {
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
mndCleanupMsg2(pMsg);
}
if (tsMworker.peerRspFp[msgType]) {
(*tsMworker.peerRspFp[msgType])(pRpcMsg);
} else {
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
}
mndCleanupMsg2(pMsg);
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册