提交 49917d51 编写于 作者: D dapan1121

enh: refactor batch meta msg

上级 fc19382a
...@@ -3168,8 +3168,7 @@ typedef struct { ...@@ -3168,8 +3168,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
int32_t msgNum; SArray* pMsgs; //SArray<SBatchMsg>
SBatchMsg msg[];
} SBatchReq; } SBatchReq;
typedef struct { typedef struct {
...@@ -3178,14 +3177,23 @@ typedef struct { ...@@ -3178,14 +3177,23 @@ typedef struct {
int32_t msgLen; int32_t msgLen;
int32_t rspCode; int32_t rspCode;
void* msg; void* msg;
} SBatchRspMsg;
typedef struct {
SArray* pRsps; //SArray<SBatchRspMsg>
} SBatchRsp; } SBatchRsp;
static FORCE_INLINE void tFreeSBatchRsp(void* p) { int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
if (NULL == p) { if (NULL == p) {
return; return;
} }
SBatchRsp* pRsp = (SBatchRsp*)p; SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
taosMemoryFree(pRsp->msg); taosMemoryFree(pRsp->msg);
} }
......
...@@ -351,7 +351,7 @@ typedef struct SVgDataBlocks { ...@@ -351,7 +351,7 @@ typedef struct SVgDataBlocks {
SVgroupInfo vg; SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
uint32_t size; uint32_t size;
void* pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... void* pData; // SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks; } SVgDataBlocks;
typedef void (*FFreeDataBlockHash)(SHashObj*); typedef void (*FFreeDataBlockHash)(SHashObj*);
......
...@@ -4446,6 +4446,139 @@ void tFreeSExplainRsp(SExplainRsp *pRsp) { ...@@ -4446,6 +4446,139 @@ void tFreeSExplainRsp(SExplainRsp *pRsp) {
taosMemoryFreeClear(pRsp->subplanInfo); taosMemoryFreeClear(pRsp->subplanInfo);
} }
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pReq->pMsgs);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchMsg *pMsg = taosArrayGet(pReq->pMsgs, i);
if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgType) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num <= 0) {
pReq->pMsgs = NULL;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
pReq->pMsgs = taosArrayInit(num, sizeof(SBatchMsg));
if (NULL == pReq->pMsgs) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchMsg msg = {0};
if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgType) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t**)&msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pRsps);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchRspMsg *pMsg = taosArrayGet(pRsp->pRsps, i);
if (tEncodeI32(&encoder, pMsg->reqType) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgIdx) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->msgLen) < 0) return -1;
if (tEncodeI32(&encoder, pMsg->rspCode) < 0) return -1;
if (tEncodeBinary(&encoder, pMsg->msg, pMsg->msgLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num <= 0) {
pRsp->pRsps = NULL;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
pRsp->pRsps = taosArrayInit(num, sizeof(SBatchRspMsg));
if (NULL == pRsp->pRsps) return -1;
for (int32_t i = 0; i < num; ++i) {
SBatchRspMsg msg = {0};
if (tDecodeI32(&decoder, &msg.reqType) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgIdx) < 0) return -1;
if (tDecodeI32(&decoder, &msg.msgLen) < 0) return -1;
if (tDecodeI32(&decoder, &msg.rspCode) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
......
...@@ -63,81 +63,60 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { ...@@ -63,81 +63,60 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
return code; return code;
} }
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
rpcFreeCont(pRsp->msg);
}
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0; int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont; SBatchReq batchReq = {0};
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchMsg req = {0}; SBatchMsg req = {0};
SBatchRsp rsp = {0}; SBatchRspMsg rsp = {0};
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg; SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void *pRsp = NULL; void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
code = TSDB_CODE_OUT_OF_MEMORY;
mError("tDeserializeSBatchReq failed");
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) { if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
mError("too many msgs %d in mnode batch meta req", msgNum); mError("too many msgs %d in mnode batch meta req", msgNum);
goto _exit; goto _exit;
} }
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
if (NULL == batchRsp) { if (NULL == batchRsp.pRsps) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
for (int32_t i = 0; i < msgNum; ++i) { for (int32_t i = 0; i < msgNum; ++i) {
if (offset >= pMsg->contLen) { SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i);
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset)); reqMsg.msgType = req->msgType;
offset += sizeof(req.msgLen); reqMsg.pCont = req->msg;
if (offset >= pMsg->contLen) { reqMsg.contLen = req->msgLen;
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msg = (char *)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
reqMsg.info.rsp = NULL; reqMsg.info.rsp = NULL;
reqMsg.info.rspLen = 0; reqMsg.info.rspLen = 0;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)];
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp); taosArrayDestroy(batchRsp.pRsps);
return -1; return -1;
} }
...@@ -146,49 +125,29 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -146,49 +125,29 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
} else { } else {
rsp.rspCode = 0; rsp.rspCode = 0;
} }
rsp.msgIdx = req.msgIdx; rsp.msgIdx = req->msgIdx;
rsp.reqType = reqMsg.msgType; rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen; rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp; rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp, &rsp); taosArrayPush(batchRsp.pRsps, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
} }
rspSize += sizeof(int32_t); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
offset = 0; if (rspSize < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pRsp = rpcMallocCont(rspSize); pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) { if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum); code = TSDB_CODE_OUT_OF_MEMORY;
offset += sizeof(msgNum); goto _exit;
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
if (p->msg != NULL) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
rpcFreeCont(p->msg);
} }
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit: _exit:
pMsg->info.rsp = pRsp; pMsg->info.rsp = pRsp;
...@@ -198,7 +157,7 @@ _exit: ...@@ -198,7 +157,7 @@ _exit:
mError("mnd get batch meta failed cause of %s", tstrerror(code)); mError("mnd get batch meta failed cause of %s", tstrerror(code));
} }
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg);
return code; return code;
} }
......
...@@ -264,77 +264,54 @@ _exit: ...@@ -264,77 +264,54 @@ _exit:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void* p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
rpcFreeCont(pRsp->msg);
}
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0; int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont; SBatchReq batchReq = {0};
int32_t msgNum = ntohl(batchReq->msgNum); SBatchMsg *req = NULL;
offset += sizeof(SBatchReq); SBatchRspMsg rsp = {0};
SBatchMsg req = {0}; SBatchRsp batchRsp = {0};
SBatchRsp rsp = {0};
SRpcMsg reqMsg = *pMsg; SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0}; SRpcMsg rspMsg = {0};
void *pRsp = NULL; void *pRsp = NULL;
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
code = TSDB_CODE_OUT_OF_MEMORY;
qError("tDeserializeSBatchReq failed");
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) { if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
qError("too many msgs %d in vnode batch meta req", msgNum); qError("too many msgs %d in vnode batch meta req", msgNum);
goto _exit; goto _exit;
} }
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
if (NULL == batchRsp) { if (NULL == batchRsp.pRsps) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
for (int32_t i = 0; i < msgNum; ++i) { for (int32_t i = 0; i < msgNum; ++i) {
if (offset >= pMsg->contLen) { req = taosArrayGet(batchReq.pMsgs, i);
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN; reqMsg.msgType = req->msgType;
taosArrayDestroy(batchRsp); reqMsg.pCont = req->msg;
return -1; reqMsg.contLen = req->msgLen;
}
switch (req->msgType) {
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
if (offset >= pMsg->contLen) {
qError("vnode offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG_LEN;
taosArrayDestroy(batchRsp);
return -1;
}
req.msg = (char *)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
switch (req.msgType) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
vnodeGetTableMeta(pVnode, &reqMsg, false); vnodeGetTableMeta(pVnode, &reqMsg, false);
break; break;
...@@ -342,63 +319,37 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -342,63 +319,37 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
vnodeGetTableCfg(pVnode, &reqMsg, false); vnodeGetTableCfg(pVnode, &reqMsg, false);
break; break;
default: default:
qError("invalid req msgType %d", req.msgType); qError("invalid req msgType %d", req->msgType);
reqMsg.code = TSDB_CODE_INVALID_MSG; reqMsg.code = TSDB_CODE_INVALID_MSG;
reqMsg.pCont = NULL; reqMsg.pCont = NULL;
reqMsg.contLen = 0; reqMsg.contLen = 0;
break; break;
} }
rsp.msgIdx = req.msgIdx; rsp.msgIdx = req->msgIdx;
rsp.reqType = reqMsg.msgType; rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen; rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code; rsp.rspCode = reqMsg.code;
rsp.msg = reqMsg.pCont; rsp.msg = reqMsg.pCont;
taosArrayPush(batchRsp, &rsp); taosArrayPush(batchRsp.pRsps, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
} }
rspSize += sizeof(int32_t); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
offset = 0; if (rspSize < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
if (rspSize > MAX_META_BATCH_RSP_SIZE) {
qError("rspSize:%d overload", rspSize);
code = TSDB_CODE_INVALID_MSG_LEN;
goto _exit; goto _exit;
} }
pRsp = rpcMallocCont(rspSize); pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) { if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp)) {
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum); code = TSDB_CODE_OUT_OF_MEMORY;
offset += sizeof(msgNum); goto _exit;
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgIdx);
offset += sizeof(p->msgIdx);
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
if (p->msg) {
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
}
taosMemoryFreeClear(p->msg);
} }
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit: _exit:
rspMsg.info = pMsg->info; rspMsg.info = pMsg->info;
...@@ -411,7 +362,7 @@ _exit: ...@@ -411,7 +362,7 @@ _exit:
qError("vnd get batch meta failed cause of %s", tstrerror(code)); qError("vnd get batch meta failed cause of %s", tstrerror(code));
} }
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp); taosArrayDestroyEx(batchRsp.pRsps, vnodeFreeSBatchRspMsg);
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
......
...@@ -234,7 +234,6 @@ typedef struct SCatalog { ...@@ -234,7 +234,6 @@ typedef struct SCatalog {
typedef struct SCtgBatch { typedef struct SCtgBatch {
int32_t batchId; int32_t batchId;
int32_t msgType; int32_t msgType;
int32_t msgSize;
SArray* pMsgs; SArray* pMsgs;
SRequestConnInfo conn; SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
......
...@@ -26,19 +26,29 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -26,19 +26,29 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
SCatalog* pCtg = pJob->pCtg; SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(cbParam->taskId); int32_t taskNum = taosArrayGetSize(cbParam->taskId);
SDataBuf taskMsg = *pMsg; SDataBuf taskMsg = *pMsg;
int32_t offset = 0; int32_t msgNum = 0;
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; SBatchRsp batchRsp = {0};
SBatchRspMsg rsp = {0};
SBatchRspMsg *pRsp = NULL;
if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) {
ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msgNum = taosArrayGetSize(batchRsp.pRsps);
}
ASSERT(taskNum == msgNum || 0 == msgNum); ASSERT(taskNum == msgNum || 0 == msgNum);
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
TMSG_INFO(cbParam->reqType + 1)); TMSG_INFO(cbParam->reqType + 1));
offset += sizeof(msgNum);
SBatchRsp rsp = {0};
SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pBatchs) { if (NULL == pBatchs) {
ctgError("taosHashInit %d batch failed", taskNum); ctgError("taosHashInit %d batch failed", taskNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
...@@ -46,25 +56,18 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -46,25 +56,18 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i); int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) { if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); pRsp = taosArrayGet(batchRsp.pRsps, i);
offset += sizeof(rsp.reqType);
rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); taskMsg.msgType = pRsp->reqType;
offset += sizeof(rsp.msgIdx); taskMsg.pData = pRsp->msg;
rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); taskMsg.len = pRsp->msgLen;
offset += sizeof(rsp.msgLen);
rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); ASSERT(pRsp->msgIdx == *msgIdx);
offset += sizeof(rsp.rspCode);
rsp.msg = ((char*)pMsg->pData) + offset;
offset += rsp.msgLen;
taskMsg.msgType = rsp.reqType;
taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen;
ASSERT(rsp.msgIdx == *msgIdx);
} else { } else {
rsp.msgIdx = *msgIdx; pRsp = &rsp;
rsp.reqType = -1; pRsp->msgIdx = *msgIdx;
pRsp->reqType = -1;
pRsp->rspCode = 0;
taskMsg.msgType = -1; taskMsg.msgType = -1;
taskMsg.pData = NULL; taskMsg.pData = NULL;
taskMsg.len = 0; taskMsg.len = 0;
...@@ -72,20 +75,22 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -72,20 +75,22 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = rsp.msgIdx; tReq.msgIdx = pRsp->msgIdx;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode));
} }
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
_return: _return:
taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
ctgFreeBatchs(pBatchs); ctgFreeBatchs(pBatchs);
CTG_RET(code); CTG_RET(code);
} }
...@@ -481,7 +486,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -481,7 +486,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) { if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) { if (vgId > 0) {
SName* pName = NULL; SName* pName = NULL;
...@@ -533,8 +537,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -533,8 +537,6 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) { if (vgId > 0) {
SName* pName = NULL; SName* pName = NULL;
if (TDMT_VND_TABLE_CFG == msgType) { if (TDMT_VND_TABLE_CFG == msgType) {
...@@ -570,38 +572,35 @@ _return: ...@@ -570,38 +572,35 @@ _return:
return code; return code;
} }
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) {
*msg = taosMemoryCalloc(1, pBatch->msgSize);
if (NULL == (*msg)) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t offset = 0;
int32_t num = taosArrayGetSize(pBatch->pMsgs); int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
if (num >= CTG_MAX_REQ_IN_BATCH) { if (num >= CTG_MAX_REQ_IN_BATCH) {
qError("too many msgs %d in one batch request", num); qError("too many msgs %d in one batch request", num);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
pBatchReq->header.vgId = htonl(vgId); SBatchReq batchReq = {0};
pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq); batchReq.header.vgId = vgId;
batchReq.pMsgs = pBatch->pMsgs;
for (int32_t i = 0; i < num; ++i) {
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i); int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq);
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx); if (msgSize < 0) {
offset += sizeof(pReq->msgIdx); qError("tSerializeSBatchReq failed");
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
offset += sizeof(pReq->msgType); }
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
offset += sizeof(pReq->msgLen); *msg = taosMemoryCalloc(1, msgSize);
memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen); if (NULL == (*msg)) {
offset += pReq->msgLen; qError("calloc batchReq msg failed, size:%d", msgSize);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (tSerializeSBatchReq(*msg, msgSize, &batchReq) < 0) {
qError("tSerializeSBatchReq failed");
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
ASSERT(pBatch->msgSize == offset); *pSize = msgSize;
qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num); qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);
...@@ -616,12 +615,13 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) { ...@@ -616,12 +615,13 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
size_t len = 0; size_t len = 0;
int32_t* vgId = taosHashGetKey(p, &len); int32_t* vgId = taosHashGetKey(p, &len);
SCtgBatch* pBatch = (SCtgBatch*)p; SCtgBatch* pBatch = (SCtgBatch*)p;
int32_t msgSize = 0;
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize); pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize);
pBatch->pTaskIds = NULL; pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code); CTG_ERR_JRET(code);
......
...@@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr ...@@ -147,7 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
uint32_t hashVal) { uint32_t hashVal) {
SHashNode *pNode = pe->next; SHashNode *pNode = pe->next;
while (pNode) { while (pNode) {
atomic_add_fetch_64(&pHashObj->compTimes, 1); //atomic_add_fetch_64(&pHashObj->compTimes, 1);
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) { pNode->removed == 0) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册