未验证 提交 a3f0283e 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18226 from taosdata/enh/msgRefactor0

enh: refactor batch meta msg
......@@ -3169,8 +3169,7 @@ typedef struct {
typedef struct {
SMsgHead header;
int32_t msgNum;
SBatchMsg msg[];
SArray* pMsgs; //SArray<SBatchMsg>
} SBatchReq;
typedef struct {
......@@ -3179,17 +3178,40 @@ typedef struct {
int32_t msgLen;
int32_t rspCode;
void* msg;
} SBatchRspMsg;
typedef struct {
SArray* pRsps; //SArray<SBatchRspMsg>
} SBatchRsp;
static FORCE_INLINE void tFreeSBatchRsp(void* p) {
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
if (NULL == msg) {
return;
}
SBatchMsg* pMsg = (SBatchMsg*)msg;
taosMemoryFree(pMsg->msg);
}
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
if (NULL == p) {
return;
}
SBatchRsp* pRsp = (SBatchRsp*)p;
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
taosMemoryFree(pRsp->msg);
}
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -351,7 +351,7 @@ typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
void* pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
void* pData; // SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef void (*FFreeDataBlockHash)(SHashObj*);
......
......@@ -728,12 +728,26 @@ void tmqSendHbReq(void* param, void* tmrId) {
taosMemoryFree(param);
return;
}
int64_t consumerId = tmq->consumerId;
int32_t epoch = tmq->epoch;
SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
if (pReq == NULL) goto OVER;
pReq->consumerId = htobe64(consumerId);
pReq->epoch = epoch;
SMqHbReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqHbReq failed");
return;
}
void *pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
return;
}
if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqHbReq %d failed", tlen);
taosMemoryFree(pReq);
return;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
......@@ -742,7 +756,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqHbReq),
.len = tlen,
.handle = NULL,
};
......@@ -1378,21 +1392,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
int32_t tlen = sizeof(SMqAskEpReq);
SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
if (req == NULL) {
tscError("failed to malloc get subscribe ep buf");
/*atomic_store_8(&tmq->epStatus, 0);*/
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqAskEpReq failed");
return -1;
}
void *pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) {
tscError("failed to malloc askEpReq msg, size:%d", tlen);
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqAskEpReq %d failed", tlen);
taosMemoryFree(pReq);
return -1;
}
req->consumerId = htobe64(tmq->consumerId);
req->epoch = htonl(tmq->epoch);
strcpy(req->cgroup, tmq->groupId);
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
taosMemoryFree(req);
taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
......@@ -1405,13 +1429,13 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(req);
taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
sendInfo->msgInfo = (SDataBuf){
.pData = req,
.pData = pReq,
.len = tlen,
.handle = NULL,
};
......
......@@ -4446,6 +4446,204 @@ void tFreeSExplainRsp(SExplainRsp *pRsp) {
taosMemoryFreeClear(pRsp->subplanInfo);
}
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pReq->pMsgs);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchMsg *pMsg = taosArrayGet(pReq->pMsgs, i);
if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgType) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num <= 0) {
pReq->pMsgs = NULL;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
pReq->pMsgs = taosArrayInit(num, sizeof(SBatchMsg));
if (NULL == pReq->pMsgs) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchMsg msg = {0};
if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgType) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pRsps);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchRspMsg *pMsg = taosArrayGet(pRsp->pRsps, i);
if (tEncodeI32(&encoder, pMsg->reqType) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->rspCode) < 0) return -1;
if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num <= 0) {
pRsp->pRsps = NULL;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
pRsp->pRsps = taosArrayInit(num, sizeof(SBatchRspMsg));
if (NULL == pRsp->pRsps) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchRspMsg msg = {0};
if (tDecodeI32(&decoder, &msg.reqType) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1;
if (tDecodeI32(&decoder, &msg.rspCode) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -325,9 +325,14 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont;
int64_t consumerId = be64toh(pReq->consumerId);
SMqHbReq req = {0};
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer %" PRId64 " not exist", consumerId);
......@@ -359,10 +364,16 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
SMqAskEpReq req = {0};
SMqAskEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch);
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int64_t consumerId = req.consumerId;
int32_t epoch = req.epoch;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
......@@ -370,7 +381,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
return -1;
}
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0);
atomic_store_32(&pConsumer->hbStatus, 0);
......
......@@ -63,81 +63,60 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
return code;
}
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
rpcFreeCont(pRsp->msg);
}
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont;
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchReq batchReq = {0};
SBatchMsg req = {0};
SBatchRsp rsp = {0};
SBatchRspMsg rsp = {0};
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
code = TSDB_CODE_OUT_OF_MEMORY;
mError("tDeserializeSBatchReq failed");
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
mError("too many msgs %d in mnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
if (NULL == batchRsp.pRsps) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < msgNum; ++i) {
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i);
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msg = (char *)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
reqMsg.msgType = req->msgType;
reqMsg.pCont = req->msg;
reqMsg.contLen = req->msgLen;
reqMsg.info.rsp = NULL;
reqMsg.info.rspLen = 0;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)];
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)];
if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
taosArrayDestroy(batchRsp.pRsps);
return -1;
}
......@@ -146,49 +125,29 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} else {
rsp.rspCode = 0;
}
rsp.msgIdx = req.msgIdx;
rsp.msgIdx = req->msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
taosArrayPush(batchRsp.pRsps, &rsp);
}
rspSize += sizeof(int32_t);
offset = 0;
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
if (rspSize < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum);
offset += sizeof(msgNum);
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
if (p->msg != NULL) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
rpcFreeCont(p->msg);
if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit:
pMsg->info.rsp = pRsp;
......@@ -198,7 +157,8 @@ _exit:
mError("mnd get batch meta failed cause of %s", tstrerror(code));
}
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg);
return code;
}
......
......@@ -264,77 +264,55 @@ _exit:
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void* p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
rpcFreeCont(pRsp->msg);
}
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont;
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchMsg req = {0};
SBatchRsp rsp = {0};
SBatchReq batchReq = {0};
SBatchMsg *req = NULL;
SBatchRspMsg rsp = {0};
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void *pRsp = NULL;
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
code = TSDB_CODE_OUT_OF_MEMORY;
qError("tDeserializeSBatchReq failed");
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
qError("too many msgs %d in vnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
if (NULL == batchRsp.pRsps) {
code = TSDB_CODE_OUT_OF_MEMORY;
qError("taosArrayInit %d SBatchRspMsg failed", msgNum);
goto _exit;
}
for (int32_t i = 0; i < msgNum; ++i) {
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req = taosArrayGet(batchReq.pMsgs, i);
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
reqMsg.msgType = req->msgType;
reqMsg.pCont = req->msg;
reqMsg.contLen = req->msgLen;
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msg = (char *)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
switch (req.msgType) {
switch (req->msgType) {
case TDMT_VND_TABLE_META:
vnodeGetTableMeta(pVnode, &reqMsg, false);
break;
......@@ -342,63 +320,40 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
vnodeGetTableCfg(pVnode, &reqMsg, false);
break;
default:
qError("invalid req msgType %d", req.msgType);
qError("invalid req msgType %d", req->msgType);
reqMsg.code = TSDB_CODE_INVALID_MSG;
reqMsg.pCont = NULL;
reqMsg.contLen = 0;
break;
}
rsp.msgIdx = req.msgIdx;
rsp.msgIdx = req->msgIdx;
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code;
rsp.msg = reqMsg.pCont;
taosArrayPush(batchRsp, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
taosArrayPush(batchRsp.pRsps, &rsp);
}
rspSize += sizeof(int32_t);
offset = 0;
if (rspSize > MAX_META_BATCH_RSP_SIZE) {
qError("rspSize:%d overload", rspSize);
code = TSDB_CODE_INVALID_MSG_LEN;
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
if (rspSize < 0) {
qError("tSerializeSBatchRsp failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
qError("rpcMallocCont %d failed", rspSize);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum);
offset += sizeof(msgNum);
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
if (p->msg) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
taosMemoryFreeClear(p->msg);
if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
qError("tSerializeSBatchRsp %d failed", rspSize);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit:
rspMsg.info = pMsg->info;
......@@ -411,7 +366,8 @@ _exit:
qError("vnd get batch meta failed cause of %s", tstrerror(code));
}
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
tmsgSendRsp(&rspMsg);
......
......@@ -234,7 +234,6 @@ typedef struct SCatalog {
typedef struct SCtgBatch {
int32_t batchId;
int32_t msgType;
int32_t msgSize;
SArray* pMsgs;
SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN];
......
......@@ -26,19 +26,29 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(cbParam->taskId);
SDataBuf taskMsg = *pMsg;
int32_t offset = 0;
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
int32_t msgNum = 0;
SBatchRsp batchRsp = {0};
SBatchRspMsg rsp = {0};
SBatchRspMsg *pRsp = NULL;
if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) {
ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msgNum = taosArrayGetSize(batchRsp.pRsps);
}
ASSERT(taskNum == msgNum || 0 == msgNum);
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
TMSG_INFO(cbParam->reqType + 1));
offset += sizeof(msgNum);
SBatchRsp rsp = {0};
SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pBatchs) {
ctgError("taosHashInit %d batch failed", taskNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < taskNum; ++i) {
......@@ -46,25 +56,18 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.reqType);
rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.msgIdx);
rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.msgLen);
rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
offset += sizeof(rsp.rspCode);
rsp.msg = ((char*)pMsg->pData) + offset;
offset += rsp.msgLen;
taskMsg.msgType = rsp.reqType;
taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen;
ASSERT(rsp.msgIdx == *msgIdx);
pRsp = taosArrayGet(batchRsp.pRsps, i);
taskMsg.msgType = pRsp->reqType;
taskMsg.pData = pRsp->msg;
taskMsg.len = pRsp->msgLen;
ASSERT(pRsp->msgIdx == *msgIdx);
} else {
rsp.msgIdx = *msgIdx;
rsp.reqType = -1;
pRsp = &rsp;
pRsp->msgIdx = *msgIdx;
pRsp->reqType = -1;
pRsp->rspCode = 0;
taskMsg.msgType = -1;
taskMsg.pData = NULL;
taskMsg.len = 0;
......@@ -72,20 +75,22 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = rsp.msgIdx;
tReq.msgIdx = pRsp->msgIdx;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
pMsgCtx->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode));
}
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
_return:
taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
ctgFreeBatchs(pBatchs);
CTG_RET(code);
}
......@@ -481,7 +486,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) {
SName* pName = NULL;
......@@ -533,8 +537,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) {
SName* pName = NULL;
if (TDMT_VND_TABLE_CFG == msgType) {
......@@ -570,38 +572,35 @@ _return:
return code;
}
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
*msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t offset = 0;
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) {
int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
if (num >= CTG_MAX_REQ_IN_BATCH) {
qError("too many msgs %d in one batch request", num);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq);
SBatchReq batchReq = {0};
batchReq.header.vgId = vgId;
batchReq.pMsgs = pBatch->pMsgs;
int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq);
if (msgSize < 0) {
qError("tSerializeSBatchReq failed");
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx);
offset += sizeof(pReq->msgIdx);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
offset += sizeof(pReq->msgType);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
offset += sizeof(pReq->msgLen);
memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen);
offset += pReq->msgLen;
*msg = taosMemoryCalloc(1, msgSize);
if (NULL == (*msg)) {
qError("calloc batchReq msg failed, size:%d", msgSize);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (tSerializeSBatchReq(*msg, msgSize, &batchReq) < 0) {
qError("tSerializeSBatchReq failed");
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
ASSERT(pBatch->msgSize == offset);
*pSize = msgSize;
qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);
......@@ -616,12 +615,13 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
size_t len = 0;
int32_t* vgId = taosHashGetKey(p, &len);
SCtgBatch* pBatch = (SCtgBatch*)p;
int32_t msgSize = 0;
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize);
pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code);
......
......@@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
uint32_t hashVal) {
SHashNode *pNode = pe->next;
while (pNode) {
atomic_add_fetch_64(&pHashObj->compTimes, 1);
//atomic_add_fetch_64(&pHashObj->compTimes, 1);
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册