提交 3f88dbe2 编写于 作者: D dapan1121

enh: refact client server messages

上级 1b0c9843
...@@ -1741,6 +1741,9 @@ typedef struct { ...@@ -1741,6 +1741,9 @@ typedef struct {
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq); int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
typedef struct { typedef struct {
int32_t code; int32_t code;
......
...@@ -756,7 +756,7 @@ void tmqSendHbReq(void* param, void* tmrId) { ...@@ -756,7 +756,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
} }
sendInfo->msgInfo = (SDataBuf){ sendInfo->msgInfo = (SDataBuf){
.pData = pReq, .pData = pReq,
.len = sizeof(SMqHbReq), .len = tlen,
.handle = NULL, .handle = NULL,
}; };
......
...@@ -4778,6 +4778,43 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) ...@@ -4778,6 +4778,43 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq)
return 0; return 0;
} }
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
......
...@@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c ...@@ -65,19 +65,37 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0; int64_t affectedRows = ctx ? ctx->affectedRows : 0;
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp rsp = {0};
pRsp->code = htonl(code); rsp.code = code;
pRsp->affectedRows = htobe64(affectedRows); rsp.affectedRows = affectedRows;
if (tbInfo) { if (tbInfo) {
strcpy(pRsp->tbFName, tbInfo->tbFName); strcpy(rsp.tbFName, tbInfo->tbFName);
pRsp->sversion = htonl(tbInfo->sversion); rsp.sversion = tbInfo->sversion;
pRsp->tversion = htonl(tbInfo->tversion); rsp.tversion = tbInfo->tversion;
}
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
if (msgSize < 0) {
qError("tSerializeSQueryTableRsp failed");
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *pRsp = rpcMallocCont(msgSize);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
qError("tSerializeSQueryTableRsp %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = rspType, .msgType = rspType,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = msgSize,
.code = code, .code = code,
.info = *pConn, .info = *pConn,
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册