提交 03c026a5 编写于 作者: S Shengliang Guan

serialize status msg

上级 c8cc7ed3
...@@ -696,8 +696,8 @@ typedef struct { ...@@ -696,8 +696,8 @@ typedef struct {
SArray* pVloads; // array of SVnodeLoad SArray* pVloads; // array of SVnodeLoad
} SStatusReq; } SStatusReq;
int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq); int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
...@@ -716,8 +716,8 @@ typedef struct { ...@@ -716,8 +716,8 @@ typedef struct {
SArray* pDnodeEps; // Array of SDnodeEp SArray* pDnodeEps; // Array of SDnodeEp
} SStatusRsp; } SStatusRsp;
int32_t tSerializeSStatusRsp(void** buf, SStatusRsp* pRsp); int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
void* tDeserializeSStatusRsp(void* buf, SStatusRsp* pRsp); int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
typedef struct { typedef struct {
int32_t reserve; int32_t reserve;
......
...@@ -583,144 +583,172 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { ...@@ -583,144 +583,172 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
int32_t tlen = 0; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
// status // status
tlen += taosEncodeFixedI32(buf, pReq->sver); if (tEncodeI32(&encoder, pReq->sver) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pReq->dver); if (tEncodeI64(&encoder, pReq->dver) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pReq->dnodeId); if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pReq->clusterId); if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pReq->rebootTime); if (tEncodeI64(&encoder, pReq->rebootTime) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pReq->updateTime); if (tEncodeI64(&encoder, pReq->updateTime) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pReq->numOfCores); if (tEncodeI32(&encoder, pReq->numOfCores) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes); if (tEncodeI32(&encoder, pReq->numOfSupportVnodes) < 0) return -1;
tlen += taosEncodeString(buf, pReq->dnodeEp); if (tEncodeCStr(&encoder, pReq->dnodeEp) < 0) return -1;
// cluster cfg // cluster cfg
tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval); if (tEncodeI32(&encoder, pReq->clusterCfg.statusInterval) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime); if (tEncodeI64(&encoder, pReq->clusterCfg.checkTime) < 0) return -1;
tlen += taosEncodeString(buf, pReq->clusterCfg.timezone); if (tEncodeCStr(&encoder, pReq->clusterCfg.timezone) < 0) return -1;
tlen += taosEncodeString(buf, pReq->clusterCfg.locale); if (tEncodeCStr(&encoder, pReq->clusterCfg.locale) < 0) return -1;
tlen += taosEncodeString(buf, pReq->clusterCfg.charset); if (tEncodeCStr(&encoder, pReq->clusterCfg.charset) < 0) return -1;
// vnode loads // vnode loads
int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads); int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads);
tlen += taosEncodeFixedI32(buf, vlen); if (tEncodeI32(&encoder, vlen) < 0) return -1;
for (int32_t i = 0; i < vlen; ++i) { for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
tlen += taosEncodeFixedI32(buf, pload->vgId); if (tEncodeI32(&encoder, pload->vgId) < 0) return -1;
tlen += taosEncodeFixedI8(buf, pload->role); if (tEncodeI8(&encoder, pload->role) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pload->numOfTables); if (tEncodeI64(&encoder, pload->numOfTables) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pload->numOfTimeSeries); if (tEncodeI64(&encoder, pload->numOfTimeSeries) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pload->totalStorage); if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pload->compStorage); if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pload->pointsWritten); if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
} }
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) { int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
// status // status
buf = taosDecodeFixedI32(buf, &pReq->sver); if (tDecodeI32(&decoder, &pReq->sver) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pReq->dver); if (tDecodeI64(&decoder, &pReq->dver) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pReq->dnodeId); if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pReq->clusterId); if (tDecodeI64(&decoder, &pReq->clusterId) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pReq->rebootTime); if (tDecodeI64(&decoder, &pReq->rebootTime) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pReq->updateTime); if (tDecodeI64(&decoder, &pReq->updateTime) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pReq->numOfCores); if (tDecodeI32(&decoder, &pReq->numOfCores) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes); if (tDecodeI32(&decoder, &pReq->numOfSupportVnodes) < 0) return -1;
buf = taosDecodeStringTo(buf, pReq->dnodeEp); if (tDecodeCStrTo(&decoder, pReq->dnodeEp) < 0) return -1;
// cluster cfg // cluster cfg
buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval); if (tDecodeI32(&decoder, &pReq->clusterCfg.statusInterval) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime); if (tDecodeI64(&decoder, &pReq->clusterCfg.checkTime) < 0) return -1;
buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone); if (tDecodeCStrTo(&decoder, pReq->clusterCfg.timezone) < 0) return -1;
buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale); if (tDecodeCStrTo(&decoder, pReq->clusterCfg.locale) < 0) return -1;
buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset); if (tDecodeCStrTo(&decoder, pReq->clusterCfg.charset) < 0) return -1;
// vnode loads // vnode loads
int32_t vlen = 0; int32_t vlen = 0;
buf = taosDecodeFixedI32(buf, &vlen); if (tDecodeI32(&decoder, &vlen) < 0) return -1;
pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad)); pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad));
if (pReq->pVloads == NULL) { if (pReq->pVloads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
for (int32_t i = 0; i < vlen; ++i) { for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0}; SVnodeLoad vload = {0};
buf = taosDecodeFixedI32(buf, &vload.vgId); if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
buf = taosDecodeFixedI8(buf, &vload.role); if (tDecodeI8(&decoder, &vload.role) < 0) return -1;
buf = taosDecodeFixedI64(buf, &vload.numOfTables); if (tDecodeI64(&decoder, &vload.numOfTables) < 0) return -1;
buf = taosDecodeFixedI64(buf, &vload.numOfTimeSeries); if (tDecodeI64(&decoder, &vload.numOfTimeSeries) < 0) return -1;
buf = taosDecodeFixedI64(buf, &vload.totalStorage); if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
buf = taosDecodeFixedI64(buf, &vload.compStorage); if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
buf = taosDecodeFixedI64(buf, &vload.pointsWritten); if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) { if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
} }
return buf; tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
int32_t tSerializeSStatusRsp(void **buf, SStatusRsp *pRsp) { int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
int32_t tlen = 0; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
// status; if (tStartEncode(&encoder) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pRsp->dver);
// status
if (tEncodeI64(&encoder, pRsp->dver) < 0) return -1;
// dnode cfg // dnode cfg
tlen += taosEncodeFixedI32(buf, pRsp->dnodeCfg.dnodeId); if (tEncodeI32(&encoder, pRsp->dnodeCfg.dnodeId) < 0) return -1;
tlen += taosEncodeFixedI64(buf, pRsp->dnodeCfg.clusterId); if (tEncodeI64(&encoder, pRsp->dnodeCfg.clusterId) < 0) return -1;
// dnode eps // dnode eps
int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps); int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps);
tlen += taosEncodeFixedI32(buf, dlen); if (tEncodeI32(&encoder, dlen) < 0) return -1;
for (int32_t i = 0; i < dlen; ++i) { for (int32_t i = 0; i < dlen; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i); SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i);
tlen += taosEncodeFixedI32(buf, pDnodeEp->id); if (tEncodeI32(&encoder, pDnodeEp->id) < 0) return -1;
tlen += taosEncodeFixedI8(buf, pDnodeEp->isMnode); if (tEncodeI8(&encoder, pDnodeEp->isMnode) < 0) return -1;
tlen += taosEncodeString(buf, pDnodeEp->ep.fqdn); if (tEncodeCStr(&encoder, pDnodeEp->ep.fqdn) < 0) return -1;
tlen += taosEncodeFixedU16(buf, pDnodeEp->ep.port); if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1;
} }
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSStatusRsp(void *buf, SStatusRsp *pRsp) { int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
// status // status
buf = taosDecodeFixedI64(buf, &pRsp->dver); if (tDecodeI64(&decoder, &pRsp->dver) < 0) return -1;
// cluster cfg // cluster cfg
buf = taosDecodeFixedI32(buf, &pRsp->dnodeCfg.dnodeId); if (tDecodeI32(&decoder, &pRsp->dnodeCfg.dnodeId) < 0) return -1;
buf = taosDecodeFixedI64(buf, &pRsp->dnodeCfg.clusterId); if (tDecodeI64(&decoder, &pRsp->dnodeCfg.clusterId) < 0) return -1;
// dnode eps // dnode eps
int32_t dlen = 0; int32_t dlen = 0;
buf = taosDecodeFixedI32(buf, &dlen); if (tDecodeI32(&decoder, &dlen) < 0) return -1;
pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp)); pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp));
if (pRsp->pDnodeEps == NULL) { if (pRsp->pDnodeEps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
for (int32_t i = 0; i < dlen; ++i) { for (int32_t i = 0; i < dlen; ++i) {
SDnodeEp dnodeEp = {0}; SDnodeEp dnodeEp = {0};
buf = taosDecodeFixedI32(buf, &dnodeEp.id); if (tDecodeI32(&decoder, &dnodeEp.id) < 0) return -1;
buf = taosDecodeFixedI8(buf, &dnodeEp.isMnode); if (tDecodeI8(&decoder, &dnodeEp.isMnode) < 0) return -1;
buf = taosDecodeStringTo(buf, dnodeEp.ep.fqdn); if (tDecodeCStrTo(&decoder, dnodeEp.ep.fqdn) < 0) return -1;
buf = taosDecodeFixedU16(buf, &dnodeEp.ep.port); if (tDecodeU16(&decoder, &dnodeEp.ep.port) < 0) return -1;
if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) { if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
} }
return buf; tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
......
...@@ -379,10 +379,9 @@ void dndSendStatusReq(SDnode *pDnode) { ...@@ -379,10 +379,9 @@ void dndSendStatusReq(SDnode *pDnode) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
dndGetVnodeLoads(pDnode, req.pVloads); dndGetVnodeLoads(pDnode, req.pVloads);
int32_t contLen = tSerializeSStatusReq(NULL, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead; tSerializeSStatusReq(pHead, contLen, &req);
tSerializeSStatusReq(&pBuf, &req);
taosArrayDestroy(req.pVloads); taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
...@@ -437,7 +436,8 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { ...@@ -437,7 +436,8 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
} }
} else { } else {
SStatusRsp statusRsp = {0}; SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) { if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver; pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg); dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps); dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
......
...@@ -302,7 +302,10 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { ...@@ -302,7 +302,10 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
int32_t code = -1; int32_t code = -1;
if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER; if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &statusReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto PROCESS_STATUS_MSG_OVER;
}
if (statusReq.dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
...@@ -410,10 +413,9 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { ...@@ -410,10 +413,9 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
mndGetDnodeData(pMnode, statusRsp.pDnodeEps); mndGetDnodeData(pMnode, statusRsp.pDnodeEps);
int32_t contLen = tSerializeSStatusRsp(NULL, &statusRsp); int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead; tSerializeSStatusRsp(pHead, contLen, &statusRsp);
tSerializeSStatusRsp(&pBuf, &statusRsp);
taosArrayDestroy(statusRsp.pDnodeEps); taosArrayDestroy(statusRsp.pDnodeEps);
pReq->contLen = contLen; pReq->contLen = contLen;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册