提交 108b9ffe 编写于 作者: S Shengliang Guan

refact: add sequence for status msg

上级 b98e3f34
......@@ -1128,6 +1128,7 @@ typedef struct {
SQnodeLoad qload;
SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
......@@ -1149,6 +1150,7 @@ typedef struct {
int64_t dnodeVer;
SDnodeCfg dnodeCfg;
SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq;
} SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
......
......@@ -1020,6 +1020,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1095,6 +1096,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......@@ -1126,6 +1128,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1;
}
if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1167,6 +1170,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
}
}
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......
......@@ -36,6 +36,7 @@ typedef struct SDnodeMgmt {
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
} SDnodeMgmt;
// dmHandle.c
......
......@@ -32,9 +32,13 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
}
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
dGTrace("status msg received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
if (pRsp->code != 0) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
pMgmt->statusSeq);
pMgmt->pData->dropped = 1;
dmWriteEps(pMgmt->pData);
}
......@@ -42,9 +46,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer,
pMgmt->pData->dnodeVer);
if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
dGInfo("status msg received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
......@@ -91,6 +95,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getQnodeLoadsFp)(&req.qload);
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
......@@ -99,7 +106,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer);
dTrace("send status msg to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
......
......@@ -345,6 +345,19 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
pDnode->accessTimes++;
pDnode->lastAccessTime = curMs;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
......@@ -396,18 +409,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseQnode(pMnode, pQnode);
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
pDnode->accessTimes++;
pDnode->lastAccessTime = curMs;
mTrace("dnode:%d, status received, access times:%d check:%d online:%d reboot:%d changed:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged);
if (needCheck) {
if (statusReq.sver != tsVersion) {
if (pDnode != NULL) {
......@@ -455,6 +456,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->memTotal = statusReq.memTotal;
SStatusRsp statusRsp = {0};
statusRsp.statusSeq++;
statusRsp.dnodeVer = dnodeVer;
statusRsp.dnodeCfg.dnodeId = pDnode->id;
statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册