From e89dc53200a1451b28ef546443c14e47a0eced31 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 14 Feb 2022 15:27:38 +0800 Subject: [PATCH] seralize db msg --- include/common/tmsg.h | 131 +++--- source/client/src/clientHb.c | 39 +- source/common/src/tmsg.c | 277 ++++++++++--- source/dnode/mnode/impl/inc/mndDb.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 385 +++++++++++------- source/dnode/mnode/impl/src/mndProfile.c | 37 +- source/dnode/mnode/impl/src/mndTrans.c | 8 +- source/dnode/mnode/impl/test/db/db.cpp | 103 ++--- .../dnode/mnode/impl/test/profile/profile.cpp | 26 +- source/libs/catalog/test/catalogTests.cpp | 55 +-- source/libs/qcom/src/querymsg.c | 55 +-- 11 files changed, 701 insertions(+), 417 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1bd347fc24..52fa0c28f8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -328,6 +328,26 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { return buf; } +static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { + if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; + if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; + if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) { + if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; + if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + typedef struct { int32_t acctId; int64_t clusterId; @@ -612,6 +632,27 @@ typedef struct { int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); +typedef struct { + char db[TSDB_DB_FNAME_LEN]; + uint64_t uid; + int32_t vgVersion; + int32_t vgNum; + int8_t hashMethod; + SArray* pVgroupInfos; // Array of SVgroupInfo +} SUseDbRsp; + +int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); +int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); +void tFreeSUsedbRsp(SUseDbRsp* pRsp); + +typedef struct { + SArray* pArray; // Array of SUseDbRsp +} SUseDbBatchRsp; + +int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); +int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); +void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp); + typedef struct { char db[TSDB_DB_FNAME_LEN]; } SSyncDbReq, SCompactDbReq; @@ -841,15 +882,6 @@ typedef struct { char* data; } STagData; -typedef struct { - char db[TSDB_DB_FNAME_LEN]; - uint64_t uid; - int32_t vgVersion; - int32_t vgNum; - int8_t hashMethod; - SVgroupInfo vgroupInfo[]; -} SUseDbRsp; - /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' @@ -1535,43 +1567,30 @@ typedef struct { SArray* rsps; // SArray } SClientHbBatchRsp; -static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return taosIntHash_64(key, keyLen); -} - -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } -int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); -void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); - - -static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { - void *pIter = taosHashIterate(info, NULL); +static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { + void* pIter = taosHashIterate(info, NULL); while (pIter != NULL) { SKv* kv = (SKv*)pIter; - tfree(kv->value); - pIter = taosHashIterate(info, pIter); } } - -static FORCE_INLINE void tFreeClientHbReq(void *pReq) { +static FORCE_INLINE void tFreeClientHbReq(void* pReq) { SClientHbReq* req = (SClientHbReq*)pReq; if (req->info) { tFreeReqKvHash(req->info); - taosHashCleanup(req->info); } } -int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); -void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); +int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq); +int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq); static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { - SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + SClientHbBatchReq* req = (SClientHbBatchReq*)pReq; if (deep) { taosArrayDestroyEx(req->reqs, tFreeClientHbReq); } else { @@ -1580,54 +1599,52 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { free(pReq); } -static FORCE_INLINE void tFreeClientKv(void *pKv) { - SKv *kv = (SKv *)pKv; +static FORCE_INLINE void tFreeClientKv(void* pKv) { + SKv* kv = (SKv*)pKv; if (kv) { tfree(kv->value); } } -static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { +static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } - static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { - SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; + SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp; taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); } +int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); +int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); -int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); -void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); - -static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->key); - tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); - return tlen; +static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) { + if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; + if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; + return 0; } -static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->key); - buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); - return buf; +static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { + if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1; + if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1; + pKv->value = malloc(pKv->valueLen + 1); + if (pKv->value == NULL) return -1; + if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1; + return 0; } -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; +static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { + if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1; + if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1; + return 0; } -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; +static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { + if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1; + if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1; + return 0; } typedef struct SMqHbVgInfo { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 3e1af765b0..7920ae0658 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -30,14 +30,16 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t msgLen = 0; int32_t code = 0; - - while (msgLen < valueLen) { - SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen); - rsp->vgVersion = ntohl(rsp->vgVersion); - rsp->vgNum = ntohl(rsp->vgNum); - rsp->uid = be64toh(rsp->uid); + SUseDbBatchRsp batchUseRsp = {0}; + if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); + for (int32_t i = 0; i < numOfBatchs; ++i) { + SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); if (rsp->vgVersion < 0) { @@ -52,22 +54,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog return TSDB_CODE_TSC_OUT_OF_MEMORY; } - for (int32_t i = 0; i < rsp->vgNum; ++i) { - rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); - rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); - rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); - - for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) { - rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); - } - - if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { + for (int32_t j = 0; j < rsp->vgNum; ++j) { + SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); + if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { tscError("hash push failed, errno:%d", errno); taosHashCleanup(vgInfo.vgHash); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - } - + } + code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); if (code) { taosHashCleanup(vgInfo.vgHash); @@ -201,9 +196,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code tfree(param); return -1; } + char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; - tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); + tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); int32_t rspNum = taosArrayGetSize(pRsp.rsps); @@ -416,7 +412,7 @@ static void* hbThreadFunc(void* param) { if (pReq == NULL) { continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); void *buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -424,8 +420,7 @@ static void* hbThreadFunc(void* param) { hbClearReqInfo(pAppHbMgr); break; } - void *abuf = buf; - tSerializeSClientHbBatchReq(&abuf, pReq); + tSerializeSClientHbBatchReq(buf, tlen, pReq); SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 20e1da9d07..68924e1be4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -85,118 +85,156 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } -int32_t tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { - int32_t tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); +static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { + if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; int32_t kvNum = taosHashGetSize(pReq->info); - tlen += taosEncodeFixedI32(buf, kvNum); - SKv *kv; + if (tEncodeI32(pEncoder, kvNum) < 0) return -1; void *pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { - kv = pIter; - tlen += taosEncodeSKv(buf, kv); - + SKv *kv = pIter; + if (tEncodeSKv(pEncoder, kv) < 0) return -1; pIter = taosHashIterate(pReq->info, pIter); } - return tlen; + + return 0; } -void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { - buf = taosDecodeSClientHbKey(buf, &pReq->connKey); +static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) { + if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; - // TODO: error handling - int32_t kvNum; - buf = taosDecodeFixedI32(buf, &kvNum); + int32_t kvNum = 0; + if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; if (pReq->info == NULL) { pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } + if (pReq->info == NULL) return -1; for (int32_t i = 0; i < kvNum; i++) { - SKv kv; - buf = taosDecodeSKv(buf, &kv); + SKv kv = {0}; + if (tDecodeSKv(pDecoder, &kv) < 0) return -1; taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); } - return buf; + return 0; } -int32_t tSerializeSClientHbRsp(void **buf, const SClientHbRsp *pRsp) { - int32_t tlen = 0; +static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp) { + if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; + int32_t kvNum = taosArrayGetSize(pRsp->info); - tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey); - tlen += taosEncodeFixedI32(buf, pRsp->status); - tlen += taosEncodeFixedI32(buf, kvNum); + if (tEncodeI32(pEncoder, kvNum) < 0) return -1; for (int32_t i = 0; i < kvNum; i++) { - SKv *kv = (SKv *)taosArrayGet(pRsp->info, i); - tlen += taosEncodeSKv(buf, kv); + SKv *kv = taosArrayGet(pRsp->info, i); + if (tEncodeSKv(pEncoder, kv) < 0) return -1; } - return tlen; + + return 0; } -void *tDeserializeSClientHbRsp(void *buf, SClientHbRsp *pRsp) { +static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) { + if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; + int32_t kvNum = 0; - buf = taosDecodeSClientHbKey(buf, &pRsp->connKey); - buf = taosDecodeFixedI32(buf, &pRsp->status); - buf = taosDecodeFixedI32(buf, &kvNum); + if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); + if (pRsp->info == NULL) return -1; for (int32_t i = 0; i < kvNum; i++) { SKv kv = {0}; - buf = taosDecodeSKv(buf, &kv); + tDecodeSKv(pDecoder, &kv); taosArrayPush(pRsp->info, &kv); } - return buf; + return 0; } -int32_t tSerializeSClientHbBatchReq(void **buf, const SClientHbBatchReq *pBatchReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pBatchReq->reqId); +int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBatchReq *pBatchReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pBatchReq->reqId) < 0) return -1; + int32_t reqNum = taosArrayGetSize(pBatchReq->reqs); - tlen += taosEncodeFixedI32(buf, reqNum); + if (tEncodeI32(&encoder, reqNum) < 0) return -1; for (int32_t i = 0; i < reqNum; i++) { SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i); - tlen += tSerializeSClientHbReq(buf, pReq); + if (tSerializeSClientHbReq(&encoder, pReq) < 0) return -1; } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSClientHbBatchReq(void *buf, SClientHbBatchReq *pBatchReq) { - buf = taosDecodeFixedI64(buf, &pBatchReq->reqId); +int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchReq *pBatchReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pBatchReq->reqId) < 0) return -1; + + int32_t reqNum = 0; + if (tDecodeI32(&decoder, &reqNum) < 0) return -1; if (pBatchReq->reqs == NULL) { - pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq)); + pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); } - - int32_t reqNum; - buf = taosDecodeFixedI32(buf, &reqNum); for (int32_t i = 0; i < reqNum; i++) { SClientHbReq req = {0}; - buf = tDeserializeSClientHbReq(buf, &req); + tDeserializeSClientHbReq(&decoder, &req); taosArrayPush(pBatchReq->reqs, &req); } - return buf; + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; } -int32_t tSerializeSClientHbBatchRsp(void **buf, const SClientHbBatchRsp *pBatchRsp) { - int32_t tlen = 0; - int32_t sz = taosArrayGetSize(pBatchRsp->rsps); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { +int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBatchRsp *pBatchRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1; + if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1; + + int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps); + if (tEncodeI32(&encoder, rspNum) < 0) return -1; + for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); - tlen += tSerializeSClientHbRsp(buf, pRsp); + if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1; } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSClientHbBatchRsp(void *buf, SClientHbBatchRsp *pBatchRsp) { - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pBatchRsp->rsps = taosArrayInit(sz, sizeof(SClientHbRsp)); - for (int32_t i = 0; i < sz; i++) { +int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchRsp *pBatchRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) return -1; + if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) return -1; + + int32_t rspNum = 0; + if (tDecodeI32(&decoder, &rspNum) < 0) return -1; + if (pBatchRsp->rsps == NULL) { + pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbReq)); + } + for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp rsp = {0}; - buf = tDeserializeSClientHbRsp(buf, &rsp); + tDeserializeSClientHbRsp(&decoder, &rsp); taosArrayPush(pBatchRsp->rsps, &rsp); } - return buf; + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; } int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { @@ -1306,4 +1344,129 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { tCoderClear(&decoder); return 0; +} + +static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { + if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; + if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1; + + for (int32_t i = 0; i < pRsp->vgNum; ++i) { + SVgroupInfo *pVgInfo = taosArrayGet(pRsp->pVgroupInfos, i); + if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; + if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; + if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1; + } + + return 0; +} + +int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tSerializeSUseDbRspImp(&encoder, pRsp) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; + for (int32_t i = 0; i < numOfBatch; ++i) { + SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i); + if (tSerializeSUseDbRspImp(&encoder, pUsedbRsp) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { + if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1; + if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1; + + pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo)); + if (pRsp->pVgroupInfos == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < pRsp->vgNum; ++i) { + SVgroupInfo vgInfo = {0}; + if (tDecodeI32(pDecoder, &vgInfo.vgId) < 0) return -1; + if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1; + if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1; + taosArrayPush(pRsp->pVgroupInfos, &vgInfo); + } + + return 0; +} + +int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDeserializeSUseDbRspImp(&decoder, pRsp) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; + + pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SUseDbBatchRsp)); + if (pRsp->pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfBatch; ++i) { + SUseDbRsp usedbRsp = {0}; + if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) return -1; + taosArrayPush(pRsp->pArray, &usedbRsp); + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSUsedbRsp(SUseDbRsp *pRsp) { taosArrayDestroy(pRsp->pVgroupInfos); } + +void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) { + int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); + for (int32_t i = 0; i < numOfBatch; ++i) { + SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i); + tFreeSUsedbRsp(pUsedbRsp); + } + + taosArrayDestroy(pRsp->pArray); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 5a3e6ed26e..ff0095f9f4 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -26,7 +26,7 @@ int32_t mndInitDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode); SDbObj *mndAcquireDb(SMnode *pMnode, char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); -int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen); +int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 1a01c3852a..56d743cd5a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -634,43 +634,56 @@ UPDATE_DB_OVER: } static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SAlterDbReq *pAlter = pReq->rpcMsg.pCont; - pAlter->totalBlocks = htonl(pAlter->totalBlocks); - pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0); - pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1); - pAlter->daysToKeep2 = htonl(pAlter->daysToKeep2); - pAlter->fsyncPeriod = htonl(pAlter->fsyncPeriod); + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SAlterDbReq alterReq = {0}; + + if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto ALTER_DB_OVER; + } - mDebug("db:%s, start to alter", pAlter->db); + mDebug("db:%s, start to alter", alterReq.db); - SDbObj *pDb = mndAcquireDb(pMnode, pAlter->db); + pDb = mndAcquireDb(pMnode, alterReq.db); if (pDb == NULL) { - mError("db:%s, failed to alter since %s", pAlter->db, terrstr()); - return TSDB_CODE_MND_DB_NOT_EXIST; + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + goto ALTER_DB_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto ALTER_DB_OVER; + } + + if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) { + goto ALTER_DB_OVER; } SDbObj dbObj = {0}; memcpy(&dbObj, pDb, sizeof(SDbObj)); - int32_t code = mndSetDbCfgFromAlterDbReq(&dbObj, pAlter); + code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq); if (code != 0) { - mndReleaseDb(pMnode, pDb); - mError("db:%s, failed to alter since %s", pAlter->db, tstrerror(code)); - return code; + goto ALTER_DB_OVER; } dbObj.cfgVersion++; dbObj.updateTime = taosGetTimestampMs(); code = mndUpdateDb(pMnode, pReq, pDb, &dbObj); - mndReleaseDb(pMnode, pDb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - if (code != 0) { - mError("db:%s, failed to alter since %s", pAlter->db, tstrerror(code)); - return code; +ALTER_DB_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + + return code; } static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { @@ -772,11 +785,18 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; - int32_t rspLen = sizeof(SDropDbRsp); - SDropDbRsp *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) goto DROP_DB_OVER; - memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); - pRsp->uid = htobe64(pDb->uid); + SDropDbRsp dropRsp = {0}; + memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + dropRsp.uid = pDb->uid; + + int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp); + void *pRsp = malloc(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto DROP_DB_OVER; + } + tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp); + mndTransSetRpcRsp(pTrans, pRsp, rspLen); if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; @@ -789,35 +809,54 @@ DROP_DB_OVER: } static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SDropDbReq *pDrop = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SDropDbReq dropReq = {0}; + + if (tDeserializeSDropDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto DROP_DB_OVER; + } - mDebug("db:%s, start to drop", pDrop->db); + mDebug("db:%s, start to drop", dropReq.db); - SDbObj *pDb = mndAcquireDb(pMnode, pDrop->db); + pDb = mndAcquireDb(pMnode, dropReq.db); if (pDb == NULL) { - if (pDrop->ignoreNotExists) { - mDebug("db:%s, not exist, ignore not exist is set", pDrop->db); - return TSDB_CODE_SUCCESS; + if (dropReq.ignoreNotExists) { + code = 0; + goto DROP_DB_OVER; } else { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - mError("db:%s, failed to drop since %s", pDrop->db, terrstr()); - return -1; + goto DROP_DB_OVER; } } - int32_t code = mndDropDb(pMnode, pReq, pDb); - mndReleaseDb(pMnode, pDb); + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto DROP_DB_OVER; + } - if (code != 0) { - mError("db:%s, failed to drop since %s", pDrop->db, terrstr()); - return code; + if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) { + goto DROP_DB_OVER; } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + code = mndDropDb(pMnode, pReq, pDb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +DROP_DB_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to drop since %s", dropReq.db, terrstr()); + } + + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + + return code; } -static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) { +static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; @@ -828,168 +867,236 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgLis if (pIter == NULL) break; if (pVgroup->dbUid == pDb->uid) { - SVgroupInfo *pInfo = &vgList[vindex]; - pInfo->vgId = htonl(pVgroup->vgId); - pInfo->hashBegin = htonl(pVgroup->hashBegin); - pInfo->hashEnd = htonl(pVgroup->hashEnd); - pInfo->epset.numOfEps = pVgroup->replica; + SVgroupInfo vgInfo = {0}; + vgInfo.vgId = pVgroup->vgId; + vgInfo.hashBegin = pVgroup->hashBegin; + vgInfo.hashEnd = pVgroup->hashEnd; + vgInfo.epset.numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEp *pEp = &pInfo->epset.eps[gid]; + SEp *pEp = &vgInfo.epset.eps[gid]; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode != NULL) { memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEp->port = htons(pDnode->port); + pEp->port = pDnode->port; } mndReleaseDnode(pMnode, pDnode); if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - pInfo->epset.inUse = gid; + vgInfo.epset.inUse = gid; } } vindex++; + taosArrayPush(pVgList, &vgInfo); } sdbRelease(pSdb, pVgroup); } - - *vgNum = vindex; } static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SSdb *pSdb = pMnode->pSdb; - SUseDbReq *pUse = pReq->rpcMsg.pCont; - pUse->vgVersion = htonl(pUse->vgVersion); + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SUseDbReq usedbReq = {0}; + SUseDbRsp usedbRsp = {0}; + + if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto USE_DB_OVER; + } - SDbObj *pDb = mndAcquireDb(pMnode, pUse->db); + pDb = mndAcquireDb(pMnode, usedbReq.db); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - mError("db:%s, failed to process use db req since %s", pUse->db, terrstr()); - return -1; + goto USE_DB_OVER; } - int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); - SUseDbRsp *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { - mndReleaseDb(pMnode, pDb); + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto USE_DB_OVER; + } + + if (mndCheckUseDbAuth(pUser, pDb) != 0) { + goto USE_DB_OVER; + } + + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto USE_DB_OVER; + } + + if (usedbReq.vgVersion < pDb->vgVersion) { + mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); } - int32_t vgNum = 0; + memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + usedbRsp.uid = pDb->uid; + usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.hashMethod = pDb->hashMethod; - if (pUse->vgVersion < pDb->vgVersion) { - mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); + int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto USE_DB_OVER; } - memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); - pRsp->uid = htobe64(pDb->uid); - pRsp->vgVersion = htonl(pDb->vgVersion); - pRsp->vgNum = htonl(vgNum); - pRsp->hashMethod = pDb->hashMethod; + tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp); pReq->pCont = pRsp; pReq->contLen = contLen; + code = 0; + +USE_DB_OVER: + if (code != 0) { + mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr()); + } + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + tFreeSUsedbRsp(&usedbRsp); - return 0; + return code; } -int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) { - SSdb *pSdb = pMnode->pSdb; - int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo)); - void *buf = malloc(bufSize); - int32_t len = 0; - int32_t contLen = 0; - int32_t bufOffset = 0; - SUseDbRsp *pRsp = NULL; - - for (int32_t i = 0; i < num; ++i) { - SDbVgVersion *db = &dbs[i]; - db->dbId = be64toh(db->dbId); - db->vgVersion = ntohl(db->vgVersion); - - len = 0; - - SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName); - if (pDb == NULL) { - mInfo("db %s not exist", db->dbFName); - - len = sizeof(SUseDbRsp); - } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { - len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); - } +int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen) { + SUseDbBatchRsp batchUseRsp = {0}; + batchUseRsp.pArray = taosArrayInit(numOfDbs, sizeof(SUseDbRsp)); + if (batchUseRsp.pArray == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - if (0 == len) { - mndReleaseDb(pMnode, pDb); + for (int32_t i = 0; i < numOfDbs; ++i) { + SDbVgVersion *pDbVgVersion = &pDbs[i]; + pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId); + pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion); + SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName); + if (pDb == NULL || pDbVgVersion->vgVersion >= pDb->vgVersion) { + mndReleaseDb(pMnode, pDb); + mDebug("db:%s, no need to use db", pDbVgVersion->dbFName); continue; } - contLen += len; - - if (contLen > bufSize) { - buf = realloc(buf, contLen); - } - - pRsp = (SUseDbRsp *)((char *)buf + bufOffset); - memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN); - if (pDb) { - int32_t vgNum = 0; - mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); - - pRsp->uid = htobe64(pDb->uid); - pRsp->vgVersion = htonl(pDb->vgVersion); - pRsp->vgNum = htonl(vgNum); - pRsp->hashMethod = pDb->hashMethod; - } else { - pRsp->uid = htobe64(db->dbId); - pRsp->vgNum = htonl(0); - pRsp->hashMethod = 0; - pRsp->vgVersion = htonl(-1); + SUseDbRsp usedbRsp = {0}; + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (usedbRsp.pVgroupInfos == NULL) { + mndReleaseDb(pMnode, pDb); + mError("db:%s, failed to malloc usedb response", pDb->name); + continue; } - bufOffset += len; + mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); + memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); + usedbRsp.uid = pDb->uid; + usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.hashMethod = pDb->hashMethod; + taosArrayPush(batchUseRsp.pArray, &usedbRsp); mndReleaseDb(pMnode, pDb); } - if (contLen > 0) { - *rsp = buf; - *rspLen = contLen; - } else { - *rsp = NULL; - tfree(buf); - *rspLen = 0; + int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp); + void *pRsp = malloc(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tFreeSUseDbBatchRsp(&batchUseRsp); + return -1; } + tSerializeSUseDbBatchRsp(pRsp, rspLen, &batchUseRsp); + *ppRsp = pRsp; + *pRspLen = rspLen; + + tFreeSUseDbBatchRsp(&batchUseRsp); return 0; } static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SSyncDbReq *pSync = pReq->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SSyncDbReq syncReq = {0}; + + if (tDeserializeSSyncDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &syncReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto SYNC_DB_OVER; + } + + mDebug("db:%s, start to sync", syncReq.db); + + pDb = mndAcquireDb(pMnode, syncReq.db); if (pDb == NULL) { - mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr()); - return -1; + goto SYNC_DB_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto SYNC_DB_OVER; + } + + if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) { + goto SYNC_DB_OVER; + } + + // code = mndSyncDb(); + +SYNC_DB_OVER: + if (code != 0) { + mError("db:%s, failed to process sync db req since %s", syncReq.db, terrstr()); } mndReleaseDb(pMnode, pDb); - return 0; + mndReleaseUser(pMnode, pUser); + + return code; } static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SCompactDbReq *pCompact = pReq->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SCompactDbReq compactReq = {0}; + + if (tDeserializeSSyncDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &compactReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto SYNC_DB_OVER; + } + + mDebug("db:%s, start to sync", compactReq.db); + + pDb = mndAcquireDb(pMnode, compactReq.db); if (pDb == NULL) { - mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr()); - return -1; + goto SYNC_DB_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto SYNC_DB_OVER; + } + + if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) { + goto SYNC_DB_OVER; + } + + // code = mndSyncDb(); + +SYNC_DB_OVER: + if (code != 0) { + mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr()); } mndReleaseDb(pMnode, pDb); - return 0; + mndReleaseUser(pMnode, pUser); + + return code; } static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index d63ade4320..f798928d9c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -343,17 +343,21 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; - char *batchReqStr = pReq->rpcMsg.pCont; + SClientHbBatchReq batchReq = {0}; - tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); + if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + SArray *pArray = batchReq.reqs; - int sz = taosArrayGetSize(pArray); + int32_t sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { - SClientHbReq* pHbReq = taosArrayGet(pArray, i); + SClientHbReq *pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { int32_t kvNum = taosHashGetSize(pHbReq->info); if (NULL == pHbReq->info || kvNum <= 0) { @@ -364,13 +368,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { void *pIter = taosHashIterate(pHbReq->info, NULL); while (pIter != NULL) { - SKv* kv = pIter; - + SKv *kv = pIter; + switch (kv->key) { case HEARTBEAT_KEY_DBINFO: { - void *rspMsg = NULL; + void *rspMsg = NULL; int32_t rspLen = 0; - mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); + mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; taosArrayPush(hbRsp.info, &kv); @@ -378,9 +382,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { break; } case HEARTBEAT_KEY_STBINFO: { - void *rspMsg = NULL; + void *rspMsg = NULL; int32_t rspLen = 0; - mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen); + mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; taosArrayPush(hbRsp.info, &kv); @@ -392,7 +396,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { hbRsp.status = TSDB_CODE_MND_APP_ERROR; break; } - + pIter = taosHashIterate(pHbReq->info, pIter); } @@ -407,15 +411,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } taosArrayDestroyEx(pArray, tFreeClientHbReq); - int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); - void* buf = rpcMallocCont(tlen); - void* abuf = buf; - tSerializeSClientHbBatchRsp(&abuf, &batchRsp); - + int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp); + void *buf = rpcMallocCont(tlen); + tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp); + int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); for (int32_t i = 0; i < rspNum; ++i) { SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i); - int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0; + int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0; for (int32_t n = 0; n < kvNum; ++n) { SKv *kv = taosArrayGet(rsp->info, n); tfree(kv->value); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 6686c0887f..3230074add 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -542,12 +542,18 @@ static void mndTransSendRpcRsp(STrans *pTrans) { } if (sendRsp && pTrans->rpcHandle != NULL) { + void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); + if (rpcCont != NULL) { + memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen); + } + free(pTrans->rpcRsp); + mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle, - .pCont = pTrans->rpcRsp, + .pCont = rpcCont, .contLen = pTrans->rpcRspLen}; rpcSendResponse(&rspMsg); pTrans->rpcHandle = NULL; diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 4c09caed9c..a3efad8aa2 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -127,18 +127,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { CheckBinary("master", 9); { - int32_t contLen = sizeof(SAlterDbReq); - - SAlterDbReq* pReq = (SAlterDbReq*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d1"); - pReq->totalBlocks = htonl(12); - pReq->daysToKeep0 = htonl(300); - pReq->daysToKeep1 = htonl(400); - pReq->daysToKeep2 = htonl(500); - pReq->fsyncPeriod = htonl(4000); - pReq->walLevel = 2; - pReq->quorum = 2; - pReq->cacheLastRow = 1; + SAlterDbReq alterdbReq = {0}; + strcpy(alterdbReq.db, "1.d1"); + alterdbReq.totalBlocks = 12; + alterdbReq.daysToKeep0 = 300; + alterdbReq.daysToKeep1 = 400; + alterdbReq.daysToKeep2 = 500; + alterdbReq.fsyncPeriod = 4000; + alterdbReq.walLevel = 2; + alterdbReq.quorum = 2; + alterdbReq.cacheLastRow = 1; + + int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSAlterDbReq(pReq, contLen, &alterdbReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -196,18 +198,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { CheckInt8(0); // update { - int32_t contLen = sizeof(SDropDbReq); + SDropDbReq dropdbReq = {0}; + strcpy(dropdbReq.db, "1.d1"); - SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d1"); + int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSDropDbReq(pReq, contLen, &dropdbReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); - SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; - pDrop->uid = htobe64(pDrop->uid); - EXPECT_STREQ(pDrop->db, "1.d1"); + SDropDbRsp dropdbRsp = {0}; + tDeserializeSDropDbRsp(pRsp->pCont, pRsp->contLen, &dropdbRsp); + EXPECT_STREQ(dropdbRsp.db, "1.d1"); } test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); @@ -260,73 +264,74 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { uint64_t d2_uid = 0; { - int32_t contLen = sizeof(SUseDbReq); + SUseDbReq usedbReq = {0}; + strcpy(usedbReq.db, "1.d2"); + usedbReq.vgVersion = -1; - SUseDbReq* pReq = (SUseDbReq*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d2"); - pReq->vgVersion = htonl(-1); + int32_t contLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSUseDbReq(pReq, contLen, &usedbReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_USE_DB, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; - EXPECT_STREQ(pRsp->db, "1.d2"); - pRsp->uid = htobe64(pRsp->uid); - d2_uid = pRsp->uid; - pRsp->vgVersion = htonl(pRsp->vgVersion); - pRsp->vgNum = htonl(pRsp->vgNum); - pRsp->hashMethod = pRsp->hashMethod; - EXPECT_EQ(pRsp->vgVersion, 1); - EXPECT_EQ(pRsp->vgNum, 2); - EXPECT_EQ(pRsp->hashMethod, 1); + SUseDbRsp usedbRsp = {0}; + tDeserializeSUseDbRsp(pMsg->pCont, pMsg->contLen, &usedbRsp); + EXPECT_STREQ(usedbRsp.db, "1.d2"); + EXPECT_EQ(usedbRsp.vgVersion, 1); + EXPECT_EQ(usedbRsp.vgNum, 2); + EXPECT_EQ(usedbRsp.hashMethod, 1); + d2_uid = usedbRsp.uid; { - SVgroupInfo* pInfo = &pRsp->vgroupInfo[0]; - pInfo->vgId = htonl(pInfo->vgId); - pInfo->hashBegin = htonl(pInfo->hashBegin); - pInfo->hashEnd = htonl(pInfo->hashEnd); + SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(usedbRsp.pVgroupInfos, 0); + pInfo->vgId = pInfo->vgId; + pInfo->hashBegin = pInfo->hashBegin; + pInfo->hashEnd = pInfo->hashEnd; EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, 0); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epset.numOfEps, 1); SEp* pAddr = &pInfo->epset.eps[0]; - pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } { - SVgroupInfo* pInfo = &pRsp->vgroupInfo[1]; - pInfo->vgId = htonl(pInfo->vgId); - pInfo->hashBegin = htonl(pInfo->hashBegin); - pInfo->hashEnd = htonl(pInfo->hashEnd); + SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(usedbRsp.pVgroupInfos, 1); + pInfo->vgId = pInfo->vgId; + pInfo->hashBegin = pInfo->hashBegin; + pInfo->hashEnd = pInfo->hashEnd; EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epset.numOfEps, 1); SEp* pAddr = &pInfo->epset.eps[0]; - pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } + + tFreeSUsedbRsp(&usedbRsp); } { - int32_t contLen = sizeof(SDropDbReq); + SDropDbReq dropdbReq = {0}; + strcpy(dropdbReq.db, "1.d2"); - SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d2"); + int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSDropDbReq(pReq, contLen, &dropdbReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); - SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; - pDrop->uid = htobe64(pDrop->uid); - EXPECT_STREQ(pDrop->db, "1.d2"); - EXPECT_EQ(pDrop->uid, d2_uid); + SDropDbRsp dropdbRsp = {0}; + tDeserializeSDropDbRsp(pRsp->pCont, pRsp->contLen, &dropdbRsp); + EXPECT_STREQ(dropdbRsp.db, "1.d2"); + EXPECT_EQ(dropdbRsp.uid, d2_uid); } } diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index ccf13d5d66..5b7bcd47b8 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -96,35 +96,35 @@ TEST_F(MndTestProfile, 03_ConnectMsg_Show) { } TEST_F(MndTestProfile, 04_HeartBeatMsg) { - SClientHbBatchReq batchReq; + SClientHbBatchReq batchReq = {0}; batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq)); SClientHbReq req = {0}; req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - SKv kv; + SKv kv = {0}; kv.key = 123; kv.value = (void*)"bcd"; kv.valueLen = 4; taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosArrayPush(batchReq.reqs, &req); - int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq); - - void* buf = (SClientHbBatchReq*)rpcMallocCont(tlen); - void* bufCopy = buf; - tSerializeSClientHbBatchReq(&bufCopy, &batchReq); + int32_t tlen = tSerializeSClientHbBatchReq(NULL, 0, &batchReq); + void* buf = (SClientHbBatchReq*)rpcMallocCont(tlen); + tSerializeSClientHbBatchReq(buf, tlen, &batchReq); + SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, buf, tlen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - char* pRspChar = (char*)pMsg->pCont; + SClientHbBatchRsp rsp = {0}; - tDeserializeSClientHbBatchRsp(pRspChar, &rsp); + tDeserializeSClientHbBatchRsp(pMsg->pCont, pMsg->contLen, &rsp); int sz = taosArrayGetSize(rsp.rsps); ASSERT_EQ(sz, 0); - //SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); - //EXPECT_EQ(pRsp->connKey.connId, 123); - //EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); - //EXPECT_EQ(pRsp->status, 0); + + // SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); + // EXPECT_EQ(pRsp->connKey.connId, 123); + // EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); + // EXPECT_EQ(pRsp->status, 0); #if 0 int32_t contLen = sizeof(SHeartBeatReq); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 4eeae0a244..7742d8ee22 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -101,7 +101,6 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { rpcMsg.msgType = TDMT_MND_CREATE_DB; SRpcMsg rpcRsp = {0}; - rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp); ASSERT_EQ(rpcRsp.code, 0); @@ -252,39 +251,43 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) { } void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - SUseDbRsp *rspMsg = NULL; // todo - - pRsp->code = 0; - pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo); - pRsp->pCont = calloc(1, pRsp->contLen); - rspMsg = (SUseDbRsp *)pRsp->pCont; - strcpy(rspMsg->db, ctgTestDbname); - rspMsg->vgVersion = htonl(ctgTestVgVersion); + SUseDbRsp usedbRsp = {0}; + strcpy(usedbRsp.db, ctgTestDbname); + usedbRsp.vgVersion = ctgTestVgVersion; ctgTestCurrentVgVersion = ctgTestVgVersion; - rspMsg->vgNum = htonl(ctgTestVgNum); - rspMsg->hashMethod = 0; - rspMsg->uid = htobe64(ctgTestDbId); + usedbRsp.vgNum = ctgTestVgNum; + usedbRsp.hashMethod = 0; + usedbRsp.uid = ctgTestDbId; + usedbRsp.pVgroupInfos = taosArrayInit(usedbRsp.vgNum, sizeof(SVgroupInfo)); - SVgroupInfo *vg = NULL; - uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; + uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; for (int32_t i = 0; i < ctgTestVgNum; ++i) { - vg = &rspMsg->vgroupInfo[i]; - - vg->vgId = htonl(i + 1); - vg->hashBegin = htonl(i * hashUnit); - vg->hashEnd = htonl(hashUnit * (i + 1) - 1); - vg->epset.numOfEps = i % TSDB_MAX_REPLICA + 1; - vg->epset.inUse = i % vg->epset.numOfEps; - for (int32_t n = 0; n < vg->epset.numOfEps; ++n) { - SEp *addr = &vg->epset.eps[n]; + SVgroupInfo vg = {0}; + vg.vgId = i + 1; + vg.hashBegin = i * hashUnit; + vg.hashEnd = hashUnit * (i + 1) - 1; + if (i == ctgTestVgNum - 1) { + vg.hashEnd = htonl(UINT32_MAX); + } + + vg.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; + vg.epset.inUse = i % vg.epset.numOfEps; + for (int32_t n = 0; n < vg.epset.numOfEps; ++n) { + SEp *addr = &vg.epset.eps[n]; strcpy(addr->fqdn, "a0"); - addr->port = htons(n + 22); + addr->port = n + 22; } + + taosArrayPush(usedbRsp.pVgroupInfos, &vg); } - vg->hashEnd = htonl(UINT32_MAX); + int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp); + void *pReq = rpcMallocCont(contLen); + tSerializeSUseDbRsp(pReq, contLen, &usedbRsp); - return; + pRsp->code = 0; + pRsp->contLen = contLen; + pRsp->pCont = pReq; } void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 3e14bfca09..4acfb6239b 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -83,78 +83,63 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms return TSDB_CODE_SUCCESS; } - -int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { +int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { return TSDB_CODE_TSC_INVALID_INPUT; } - SUseDbRsp *pRsp = (SUseDbRsp *)msg; SUseDbOutput *pOut = (SUseDbOutput *)output; - int32_t code = 0; + int32_t code = 0; - if (msgSize <= sizeof(*pRsp)) { - qError("invalid use db rsp msg size, msgSize:%d", msgSize); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + SUseDbRsp usedbRsp = {0}; + if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) { + qError("invalid use db rsp msg, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; } - - pRsp->vgVersion = ntohl(pRsp->vgVersion); - pRsp->vgNum = ntohl(pRsp->vgNum); - pRsp->uid = be64toh(pRsp->uid); - if (pRsp->vgNum < 0) { - qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); + if (usedbRsp.vgNum < 0) { + qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum); return TSDB_CODE_TSC_INVALID_VALUE; } - int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp); - if (msgSize != expectSize) { - qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum); - return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; - } - pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo)); if (NULL == pOut->dbVgroup) { qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo)); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pOut->dbId = pRsp->uid; - pOut->dbVgroup->vgVersion = pRsp->vgVersion; - pOut->dbVgroup->hashMethod = pRsp->hashMethod; - pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pOut->dbId = usedbRsp.uid; + pOut->dbVgroup->vgVersion = usedbRsp.vgVersion; + pOut->dbVgroup->hashMethod = usedbRsp.hashMethod; + pOut->dbVgroup->vgHash = + taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { - qError("taosHashInit %d failed", pRsp->vgNum); + qError("taosHashInit %d failed", usedbRsp.vgNum); tfree(pOut->dbVgroup); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - for (int32_t i = 0; i < pRsp->vgNum; ++i) { - pRsp->vgroupInfo[i].vgId = ntohl(pRsp->vgroupInfo[i].vgId); - pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin); - pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd); + for (int32_t i = 0; i < usedbRsp.vgNum; ++i) { + SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i); - for (int32_t n = 0; n < pRsp->vgroupInfo[i].epset.numOfEps; ++n) { - pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port); - } - - if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { + if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { qError("taosHashPut failed"); goto _return; } } - memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); + memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN); return code; _return: + taosArrayDestroy(usedbRsp.pVgroupInfos); if (pOut) { taosHashCleanup(pOut->dbVgroup->vgHash); tfree(pOut->dbVgroup); } - + return code; } -- GitLab