提交 e89dc532 编写于 作者: S Shengliang Guan

seralize db msg

上级 d7fc732d
...@@ -328,6 +328,26 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { ...@@ -328,6 +328,26 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
return buf; return buf;
} }
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1;
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
...@@ -612,6 +632,27 @@ typedef struct { ...@@ -612,6 +632,27 @@ typedef struct {
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
SArray* pVgroupInfos; // Array of SVgroupInfo
} SUseDbRsp;
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;
int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
} SSyncDbReq, SCompactDbReq; } SSyncDbReq, SCompactDbReq;
...@@ -841,15 +882,6 @@ typedef struct { ...@@ -841,15 +882,6 @@ typedef struct {
char* data; char* data;
} STagData; } STagData;
typedef struct {
char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
SVgroupInfo vgroupInfo[];
} SUseDbRsp;
/* /*
* sql: show tables like '%a_%' * sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%' * payload is the query condition, e.g., '%a_%'
...@@ -1535,43 +1567,30 @@ typedef struct { ...@@ -1535,43 +1567,30 @@ typedef struct {
SArray* rsps; // SArray<SClientHbRsp> SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp; } SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
return taosIntHash_64(key, keyLen);
}
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); void* pIter = taosHashIterate(info, NULL);
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SKv* kv = (SKv*)pIter; SKv* kv = (SKv*)pIter;
tfree(kv->value); tfree(kv->value);
pIter = taosHashIterate(info, pIter); pIter = taosHashIterate(info, pIter);
} }
} }
static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq; SClientHbReq* req = (SClientHbReq*)pReq;
if (req->info) { if (req->info) {
tFreeReqKvHash(req->info); tFreeReqKvHash(req->info);
taosHashCleanup(req->info); taosHashCleanup(req->info);
} }
} }
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
if (deep) { if (deep) {
taosArrayDestroyEx(req->reqs, tFreeClientHbReq); taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
} else { } else {
...@@ -1580,54 +1599,52 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { ...@@ -1580,54 +1599,52 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
free(pReq); free(pReq);
} }
static FORCE_INLINE void tFreeClientKv(void *pKv) { static FORCE_INLINE void tFreeClientKv(void* pKv) {
SKv *kv = (SKv *)pKv; SKv* kv = (SKv*)pKv;
if (kv) { if (kv) {
tfree(kv->value); tfree(kv->value);
} }
} }
static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp; SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
} }
static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp;
taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
} }
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { if (tEncodeCStrWithLen(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
int tlen = 0; return 0;
tlen += taosEncodeFixedI32(buf, pKv->key);
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
return tlen;
} }
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
buf = taosDecodeFixedI32(buf, &pKv->key); if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pKv->valueLen); if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); pKv->value = malloc(pKv->valueLen + 1);
return buf; if (pKv->value == NULL) return -1;
if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1;
return 0;
} }
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
int tlen = 0; if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pKey->connId); if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pKey->hbType); return 0;
return tlen;
} }
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId); if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pKey->hbType); if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1;
return buf; return 0;
} }
typedef struct SMqHbVgInfo { typedef struct SMqHbVgInfo {
......
...@@ -30,14 +30,16 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) ...@@ -30,14 +30,16 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp)
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = 0; int32_t code = 0;
while (msgLen < valueLen) {
SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen);
rsp->vgVersion = ntohl(rsp->vgVersion); SUseDbBatchRsp batchUseRsp = {0};
rsp->vgNum = ntohl(rsp->vgNum); if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
rsp->uid = be64toh(rsp->uid); terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
...@@ -52,22 +54,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -52,22 +54,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < rsp->vgNum; ++i) { for (int32_t j = 0; j < rsp->vgNum; ++j) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno); tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash); taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo);
if (code) { if (code) {
taosHashCleanup(vgInfo.vgHash); taosHashCleanup(vgInfo.vgHash);
...@@ -201,9 +196,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code ...@@ -201,9 +196,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param); tfree(param);
return -1; return -1;
} }
char *key = (char *)param; char *key = (char *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
...@@ -416,7 +412,7 @@ static void* hbThreadFunc(void* param) { ...@@ -416,7 +412,7 @@ static void* hbThreadFunc(void* param) {
if (pReq == NULL) { if (pReq == NULL) {
continue; continue;
} }
int tlen = tSerializeSClientHbBatchReq(NULL, pReq); int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
void *buf = malloc(tlen); void *buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -424,8 +420,7 @@ static void* hbThreadFunc(void* param) { ...@@ -424,8 +420,7 @@ static void* hbThreadFunc(void* param) {
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
break; break;
} }
void *abuf = buf; tSerializeSClientHbBatchReq(buf, tlen, pReq);
tSerializeSClientHbBatchReq(&abuf, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
...@@ -85,118 +85,156 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -85,118 +85,156 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
int32_t tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
int32_t tlen = 0; if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
int32_t kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum); if (tEncodeI32(pEncoder, kvNum) < 0) return -1;
SKv *kv;
void *pIter = taosHashIterate(pReq->info, NULL); void *pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
kv = pIter; SKv *kv = pIter;
tlen += taosEncodeSKv(buf, kv); if (tEncodeSKv(pEncoder, kv) < 0) return -1;
pIter = taosHashIterate(pReq->info, pIter); pIter = taosHashIterate(pReq->info, pIter);
} }
return tlen;
return 0;
} }
void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) {
buf = taosDecodeSClientHbKey(buf, &pReq->connKey); if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1;
// TODO: error handling int32_t kvNum = 0;
int32_t kvNum; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1;
buf = taosDecodeFixedI32(buf, &kvNum);
if (pReq->info == NULL) { if (pReq->info == NULL) {
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
if (pReq->info == NULL) return -1;
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv kv; SKv kv = {0};
buf = taosDecodeSKv(buf, &kv); if (tDecodeSKv(pDecoder, &kv) < 0) return -1;
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
} }
return buf; return 0;
} }
int32_t tSerializeSClientHbRsp(void **buf, const SClientHbRsp *pRsp) { static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp) {
int32_t tlen = 0; if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1;
int32_t kvNum = taosArrayGetSize(pRsp->info); int32_t kvNum = taosArrayGetSize(pRsp->info);
tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey); if (tEncodeI32(pEncoder, kvNum) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pRsp->status);
tlen += taosEncodeFixedI32(buf, kvNum);
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv *kv = (SKv *)taosArrayGet(pRsp->info, i); SKv *kv = taosArrayGet(pRsp->info, i);
tlen += taosEncodeSKv(buf, kv); if (tEncodeSKv(pEncoder, kv) < 0) return -1;
} }
return tlen;
return 0;
} }
void *tDeserializeSClientHbRsp(void *buf, SClientHbRsp *pRsp) { static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) {
if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1;
int32_t kvNum = 0; int32_t kvNum = 0;
buf = taosDecodeSClientHbKey(buf, &pRsp->connKey); if (tDecodeI32(pDecoder, &kvNum) < 0) return -1;
buf = taosDecodeFixedI32(buf, &pRsp->status);
buf = taosDecodeFixedI32(buf, &kvNum);
pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
if (pRsp->info == NULL) return -1;
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv kv = {0}; SKv kv = {0};
buf = taosDecodeSKv(buf, &kv); tDecodeSKv(pDecoder, &kv);
taosArrayPush(pRsp->info, &kv); taosArrayPush(pRsp->info, &kv);
} }
return buf; return 0;
} }
int32_t tSerializeSClientHbBatchReq(void **buf, const SClientHbBatchReq *pBatchReq) { int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBatchReq *pBatchReq) {
int32_t tlen = 0; SCoder encoder = {0};
tlen += taosEncodeFixedI64(buf, pBatchReq->reqId); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pBatchReq->reqId) < 0) return -1;
int32_t reqNum = taosArrayGetSize(pBatchReq->reqs); int32_t reqNum = taosArrayGetSize(pBatchReq->reqs);
tlen += taosEncodeFixedI32(buf, reqNum); if (tEncodeI32(&encoder, reqNum) < 0) return -1;
for (int32_t i = 0; i < reqNum; i++) { for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i); SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i);
tlen += tSerializeSClientHbReq(buf, pReq); if (tSerializeSClientHbReq(&encoder, pReq) < 0) return -1;
} }
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSClientHbBatchReq(void *buf, SClientHbBatchReq *pBatchReq) { int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchReq *pBatchReq) {
buf = taosDecodeFixedI64(buf, &pBatchReq->reqId); SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pBatchReq->reqId) < 0) return -1;
int32_t reqNum = 0;
if (tDecodeI32(&decoder, &reqNum) < 0) return -1;
if (pBatchReq->reqs == NULL) { if (pBatchReq->reqs == NULL) {
pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq));
} }
int32_t reqNum;
buf = taosDecodeFixedI32(buf, &reqNum);
for (int32_t i = 0; i < reqNum; i++) { for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq req = {0}; SClientHbReq req = {0};
buf = tDeserializeSClientHbReq(buf, &req); tDeserializeSClientHbReq(&decoder, &req);
taosArrayPush(pBatchReq->reqs, &req); taosArrayPush(pBatchReq->reqs, &req);
} }
return buf;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
int32_t tSerializeSClientHbBatchRsp(void **buf, const SClientHbBatchRsp *pBatchRsp) { int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBatchRsp *pBatchRsp) {
int32_t tlen = 0; SCoder encoder = {0};
int32_t sz = taosArrayGetSize(pBatchRsp->rsps); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1;
if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1;
int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps);
if (tEncodeI32(&encoder, rspNum) < 0) return -1;
for (int32_t i = 0; i < rspNum; i++) {
SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i);
tlen += tSerializeSClientHbRsp(buf, pRsp); if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1;
} }
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSClientHbBatchRsp(void *buf, SClientHbBatchRsp *pBatchRsp) { int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchRsp *pBatchRsp) {
int32_t sz; SCoder decoder = {0};
buf = taosDecodeFixedI32(buf, &sz); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SClientHbRsp));
for (int32_t i = 0; i < sz; i++) { if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) return -1;
if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) return -1;
int32_t rspNum = 0;
if (tDecodeI32(&decoder, &rspNum) < 0) return -1;
if (pBatchRsp->rsps == NULL) {
pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbReq));
}
for (int32_t i = 0; i < rspNum; i++) {
SClientHbRsp rsp = {0}; SClientHbRsp rsp = {0};
buf = tDeserializeSClientHbRsp(buf, &rsp); tDeserializeSClientHbRsp(&decoder, &rsp);
taosArrayPush(pBatchRsp->rsps, &rsp); taosArrayPush(pBatchRsp->rsps, &rsp);
} }
return buf;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
...@@ -1306,4 +1344,129 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { ...@@ -1306,4 +1344,129 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
}
static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->uid) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1;
for (int32_t i = 0; i < pRsp->vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(pRsp->pVgroupInfos, i);
if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1;
}
return 0;
}
int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tSerializeSUseDbRspImp(&encoder, pRsp) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i);
if (tSerializeSUseDbRspImp(&encoder, pUsedbRsp) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if (tDecodeCStrTo(pDecoder, pRsp->db) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->uid) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1;
pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo));
if (pRsp->pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pRsp->vgNum; ++i) {
SVgroupInfo vgInfo = {0};
if (tDecodeI32(pDecoder, &vgInfo.vgId) < 0) return -1;
if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1;
if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1;
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
}
return 0;
}
int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDeserializeSUseDbRspImp(&decoder, pRsp) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SUseDbBatchRsp));
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp usedbRsp = {0};
if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) return -1;
taosArrayPush(pRsp->pArray, &usedbRsp);
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
void tFreeSUsedbRsp(SUseDbRsp *pRsp) { taosArrayDestroy(pRsp->pVgroupInfos); }
void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i);
tFreeSUsedbRsp(pUsedbRsp);
}
taosArrayDestroy(pRsp->pArray);
} }
\ No newline at end of file
...@@ -26,7 +26,7 @@ int32_t mndInitDb(SMnode *pMnode); ...@@ -26,7 +26,7 @@ int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db); SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen); int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -634,43 +634,56 @@ UPDATE_DB_OVER: ...@@ -634,43 +634,56 @@ UPDATE_DB_OVER:
} }
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) { static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SAlterDbReq *pAlter = pReq->rpcMsg.pCont; int32_t code = -1;
pAlter->totalBlocks = htonl(pAlter->totalBlocks); SDbObj *pDb = NULL;
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0); SUserObj *pUser = NULL;
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1); SAlterDbReq alterReq = {0};
pAlter->daysToKeep2 = htonl(pAlter->daysToKeep2);
pAlter->fsyncPeriod = htonl(pAlter->fsyncPeriod); if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_DB_OVER;
}
mDebug("db:%s, start to alter", pAlter->db); mDebug("db:%s, start to alter", alterReq.db);
SDbObj *pDb = mndAcquireDb(pMnode, pAlter->db); pDb = mndAcquireDb(pMnode, alterReq.db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to alter since %s", pAlter->db, terrstr()); terrno = TSDB_CODE_MND_DB_NOT_EXIST;
return TSDB_CODE_MND_DB_NOT_EXIST; goto ALTER_DB_OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto ALTER_DB_OVER;
}
if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) {
goto ALTER_DB_OVER;
} }
SDbObj dbObj = {0}; SDbObj dbObj = {0};
memcpy(&dbObj, pDb, sizeof(SDbObj)); memcpy(&dbObj, pDb, sizeof(SDbObj));
int32_t code = mndSetDbCfgFromAlterDbReq(&dbObj, pAlter); code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
if (code != 0) { if (code != 0) {
mndReleaseDb(pMnode, pDb); goto ALTER_DB_OVER;
mError("db:%s, failed to alter since %s", pAlter->db, tstrerror(code));
return code;
} }
dbObj.cfgVersion++; dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs(); dbObj.updateTime = taosGetTimestampMs();
code = mndUpdateDb(pMnode, pReq, pDb, &dbObj); code = mndUpdateDb(pMnode, pReq, pDb, &dbObj);
mndReleaseDb(pMnode, pDb); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
if (code != 0) { ALTER_DB_OVER:
mError("db:%s, failed to alter since %s", pAlter->db, tstrerror(code)); if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return code; mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
return code;
} }
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
...@@ -772,11 +785,18 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { ...@@ -772,11 +785,18 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
int32_t rspLen = sizeof(SDropDbRsp); SDropDbRsp dropRsp = {0};
SDropDbRsp *pRsp = rpcMallocCont(rspLen); memcpy(dropRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
if (pRsp == NULL) goto DROP_DB_OVER; dropRsp.uid = pDb->uid;
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid); int32_t rspLen = tSerializeSDropDbRsp(NULL, 0, &dropRsp);
void *pRsp = malloc(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto DROP_DB_OVER;
}
tSerializeSDropDbRsp(pRsp, rspLen, &dropRsp);
mndTransSetRpcRsp(pTrans, pRsp, rspLen); mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
...@@ -789,35 +809,54 @@ DROP_DB_OVER: ...@@ -789,35 +809,54 @@ DROP_DB_OVER:
} }
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SDropDbReq *pDrop = pReq->rpcMsg.pCont; int32_t code = -1;
SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SDropDbReq dropReq = {0};
if (tDeserializeSDropDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_DB_OVER;
}
mDebug("db:%s, start to drop", pDrop->db); mDebug("db:%s, start to drop", dropReq.db);
SDbObj *pDb = mndAcquireDb(pMnode, pDrop->db); pDb = mndAcquireDb(pMnode, dropReq.db);
if (pDb == NULL) { if (pDb == NULL) {
if (pDrop->ignoreNotExists) { if (dropReq.ignoreNotExists) {
mDebug("db:%s, not exist, ignore not exist is set", pDrop->db); code = 0;
return TSDB_CODE_SUCCESS; goto DROP_DB_OVER;
} else { } else {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to drop since %s", pDrop->db, terrstr()); goto DROP_DB_OVER;
return -1;
} }
} }
int32_t code = mndDropDb(pMnode, pReq, pDb); pUser = mndAcquireUser(pMnode, pReq->user);
mndReleaseDb(pMnode, pDb); if (pUser == NULL) {
goto DROP_DB_OVER;
}
if (code != 0) { if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) {
mError("db:%s, failed to drop since %s", pDrop->db, terrstr()); goto DROP_DB_OVER;
return code;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; code = mndDropDb(pMnode, pReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_DB_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop since %s", dropReq.db, terrstr());
}
mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
return code;
} }
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) { static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
int32_t vindex = 0; int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -828,168 +867,236 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgLis ...@@ -828,168 +867,236 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgLis
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) { if (pVgroup->dbUid == pDb->uid) {
SVgroupInfo *pInfo = &vgList[vindex]; SVgroupInfo vgInfo = {0};
pInfo->vgId = htonl(pVgroup->vgId); vgInfo.vgId = pVgroup->vgId;
pInfo->hashBegin = htonl(pVgroup->hashBegin); vgInfo.hashBegin = pVgroup->hashBegin;
pInfo->hashEnd = htonl(pVgroup->hashEnd); vgInfo.hashEnd = pVgroup->hashEnd;
pInfo->epset.numOfEps = pVgroup->replica; vgInfo.epset.numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
SEp *pEp = &pInfo->epset.eps[gid]; SEp *pEp = &vgInfo.epset.eps[gid];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
pEp->port = htons(pDnode->port); pEp->port = pDnode->port;
} }
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
if (pVgid->role == TAOS_SYNC_STATE_LEADER) { if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
pInfo->epset.inUse = gid; vgInfo.epset.inUse = gid;
} }
} }
vindex++; vindex++;
taosArrayPush(pVgList, &vgInfo);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
*vgNum = vindex;
} }
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; int32_t code = -1;
SUseDbReq *pUse = pReq->rpcMsg.pCont; SDbObj *pDb = NULL;
pUse->vgVersion = htonl(pUse->vgVersion); SUserObj *pUser = NULL;
SUseDbReq usedbReq = {0};
SUseDbRsp usedbRsp = {0};
if (tDeserializeSUseDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &usedbReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto USE_DB_OVER;
}
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db); pDb = mndAcquireDb(pMnode, usedbReq.db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr()); goto USE_DB_OVER;
return -1;
} }
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); pUser = mndAcquireUser(pMnode, pReq->user);
SUseDbRsp *pRsp = rpcMallocCont(contLen); if (pUser == NULL) {
if (pRsp == NULL) { goto USE_DB_OVER;
mndReleaseDb(pMnode, pDb); }
if (mndCheckUseDbAuth(pUser, pDb) != 0) {
goto USE_DB_OVER;
}
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; goto USE_DB_OVER;
}
if (usedbReq.vgVersion < pDb->vgVersion) {
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
} }
int32_t vgNum = 0; memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->hashMethod;
if (pUse->vgVersion < pDb->vgVersion) { int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp);
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto USE_DB_OVER;
} }
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); tSerializeSUseDbRsp(pRsp, contLen, &usedbRsp);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
pReq->pCont = pRsp; pReq->pCont = pRsp;
pReq->contLen = contLen; pReq->contLen = contLen;
code = 0;
USE_DB_OVER:
if (code != 0) {
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
}
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
tFreeSUsedbRsp(&usedbRsp);
return 0; return code;
} }
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) { int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen) {
SSdb *pSdb = pMnode->pSdb; SUseDbBatchRsp batchUseRsp = {0};
int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo)); batchUseRsp.pArray = taosArrayInit(numOfDbs, sizeof(SUseDbRsp));
void *buf = malloc(bufSize); if (batchUseRsp.pArray == NULL) {
int32_t len = 0; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t contLen = 0; return -1;
int32_t bufOffset = 0; }
SUseDbRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = be64toh(db->dbId);
db->vgVersion = ntohl(db->vgVersion);
len = 0;
SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName);
if (pDb == NULL) {
mInfo("db %s not exist", db->dbFName);
len = sizeof(SUseDbRsp);
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
}
if (0 == len) { for (int32_t i = 0; i < numOfDbs; ++i) {
mndReleaseDb(pMnode, pDb); SDbVgVersion *pDbVgVersion = &pDbs[i];
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
if (pDb == NULL || pDbVgVersion->vgVersion >= pDb->vgVersion) {
mndReleaseDb(pMnode, pDb);
mDebug("db:%s, no need to use db", pDbVgVersion->dbFName);
continue; continue;
} }
contLen += len; SUseDbRsp usedbRsp = {0};
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (contLen > bufSize) { if (usedbRsp.pVgroupInfos == NULL) {
buf = realloc(buf, contLen); mndReleaseDb(pMnode, pDb);
} mError("db:%s, failed to malloc usedb response", pDb->name);
continue;
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN);
if (pDb) {
int32_t vgNum = 0;
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
} else {
pRsp->uid = htobe64(db->dbId);
pRsp->vgNum = htonl(0);
pRsp->hashMethod = 0;
pRsp->vgVersion = htonl(-1);
} }
bufOffset += len; mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->hashMethod;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
} }
if (contLen > 0) { int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
*rsp = buf; void *pRsp = malloc(rspLen);
*rspLen = contLen; if (pRsp == NULL) {
} else { terrno = TSDB_CODE_OUT_OF_MEMORY;
*rsp = NULL; tFreeSUseDbBatchRsp(&batchUseRsp);
tfree(buf); return -1;
*rspLen = 0;
} }
tSerializeSUseDbBatchRsp(pRsp, rspLen, &batchUseRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
tFreeSUseDbBatchRsp(&batchUseRsp);
return 0; return 0;
} }
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SSyncDbReq *pSync = pReq->rpcMsg.pCont; int32_t code = -1;
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SSyncDbReq syncReq = {0};
if (tDeserializeSSyncDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &syncReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto SYNC_DB_OVER;
}
mDebug("db:%s, start to sync", syncReq.db);
pDb = mndAcquireDb(pMnode, syncReq.db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr()); goto SYNC_DB_OVER;
return -1; }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto SYNC_DB_OVER;
}
if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) {
goto SYNC_DB_OVER;
}
// code = mndSyncDb();
SYNC_DB_OVER:
if (code != 0) {
mError("db:%s, failed to process sync db req since %s", syncReq.db, terrstr());
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
return 0; mndReleaseUser(pMnode, pUser);
return code;
} }
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) { static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SCompactDbReq *pCompact = pReq->rpcMsg.pCont; int32_t code = -1;
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SCompactDbReq compactReq = {0};
if (tDeserializeSSyncDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &compactReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto SYNC_DB_OVER;
}
mDebug("db:%s, start to sync", compactReq.db);
pDb = mndAcquireDb(pMnode, compactReq.db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr()); goto SYNC_DB_OVER;
return -1; }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto SYNC_DB_OVER;
}
if (mndCheckAlterDropCompactSyncDbAuth(pUser, pDb) != 0) {
goto SYNC_DB_OVER;
}
// code = mndSyncDb();
SYNC_DB_OVER:
if (code != 0) {
mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr());
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
return 0; mndReleaseUser(pMnode, pUser);
return code;
} }
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
......
...@@ -343,17 +343,21 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { ...@@ -343,17 +343,21 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0}; SClientHbBatchReq batchReq = {0};
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); if (tDeserializeSClientHbBatchReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &batchReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SArray *pArray = batchReq.reqs; SArray *pArray = batchReq.reqs;
int sz = taosArrayGetSize(pArray); int32_t sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0}; SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SClientHbReq* pHbReq = taosArrayGet(pArray, i); SClientHbReq *pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t kvNum = taosHashGetSize(pHbReq->info); int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) { if (NULL == pHbReq->info || kvNum <= 0) {
...@@ -364,13 +368,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -364,13 +368,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
void *pIter = taosHashIterate(pHbReq->info, NULL); void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SKv* kv = pIter; SKv *kv = pIter;
switch (kv->key) { switch (kv->key) {
case HEARTBEAT_KEY_DBINFO: { case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL; void *rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) { if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv); taosArrayPush(hbRsp.info, &kv);
...@@ -378,9 +382,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -378,9 +382,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
break; break;
} }
case HEARTBEAT_KEY_STBINFO: { case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL; void *rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen); mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) { if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv); taosArrayPush(hbRsp.info, &kv);
...@@ -392,7 +396,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -392,7 +396,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
hbRsp.status = TSDB_CODE_MND_APP_ERROR; hbRsp.status = TSDB_CODE_MND_APP_ERROR;
break; break;
} }
pIter = taosHashIterate(pHbReq->info, pIter); pIter = taosHashIterate(pHbReq->info, pIter);
} }
...@@ -407,15 +411,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -407,15 +411,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
} }
taosArrayDestroyEx(pArray, tFreeClientHbReq); taosArrayDestroyEx(pArray, tFreeClientHbReq);
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
void* buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
void* abuf = buf; tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps);
for (int32_t i = 0; i < rspNum; ++i) { for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i); SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0; int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
for (int32_t n = 0; n < kvNum; ++n) { for (int32_t n = 0; n < kvNum; ++n) {
SKv *kv = taosArrayGet(rsp->info, n); SKv *kv = taosArrayGet(rsp->info, n);
tfree(kv->value); tfree(kv->value);
......
...@@ -542,12 +542,18 @@ static void mndTransSendRpcRsp(STrans *pTrans) { ...@@ -542,12 +542,18 @@ static void mndTransSendRpcRsp(STrans *pTrans) {
} }
if (sendRsp && pTrans->rpcHandle != NULL) { if (sendRsp && pTrans->rpcHandle != NULL) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
}
free(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle); pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
.code = pTrans->code, .code = pTrans->code,
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.pCont = pTrans->rpcRsp, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen}; .contLen = pTrans->rpcRspLen};
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
......
...@@ -127,18 +127,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { ...@@ -127,18 +127,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("master", 9); CheckBinary("master", 9);
{ {
int32_t contLen = sizeof(SAlterDbReq); SAlterDbReq alterdbReq = {0};
strcpy(alterdbReq.db, "1.d1");
SAlterDbReq* pReq = (SAlterDbReq*)rpcMallocCont(contLen); alterdbReq.totalBlocks = 12;
strcpy(pReq->db, "1.d1"); alterdbReq.daysToKeep0 = 300;
pReq->totalBlocks = htonl(12); alterdbReq.daysToKeep1 = 400;
pReq->daysToKeep0 = htonl(300); alterdbReq.daysToKeep2 = 500;
pReq->daysToKeep1 = htonl(400); alterdbReq.fsyncPeriod = 4000;
pReq->daysToKeep2 = htonl(500); alterdbReq.walLevel = 2;
pReq->fsyncPeriod = htonl(4000); alterdbReq.quorum = 2;
pReq->walLevel = 2; alterdbReq.cacheLastRow = 1;
pReq->quorum = 2;
pReq->cacheLastRow = 1; int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSAlterDbReq(pReq, contLen, &alterdbReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -196,18 +198,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { ...@@ -196,18 +198,20 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckInt8(0); // update CheckInt8(0); // update
{ {
int32_t contLen = sizeof(SDropDbReq); SDropDbReq dropdbReq = {0};
strcpy(dropdbReq.db, "1.d1");
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq);
strcpy(pReq->db, "1.d1"); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDbReq(pReq, contLen, &dropdbReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; SDropDbRsp dropdbRsp = {0};
pDrop->uid = htobe64(pDrop->uid); tDeserializeSDropDbRsp(pRsp->pCont, pRsp->contLen, &dropdbRsp);
EXPECT_STREQ(pDrop->db, "1.d1"); EXPECT_STREQ(dropdbRsp.db, "1.d1");
} }
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
...@@ -260,73 +264,74 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -260,73 +264,74 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
uint64_t d2_uid = 0; uint64_t d2_uid = 0;
{ {
int32_t contLen = sizeof(SUseDbReq); SUseDbReq usedbReq = {0};
strcpy(usedbReq.db, "1.d2");
usedbReq.vgVersion = -1;
SUseDbReq* pReq = (SUseDbReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
strcpy(pReq->db, "1.d2"); void* pReq = rpcMallocCont(contLen);
pReq->vgVersion = htonl(-1); tSerializeSUseDbReq(pReq, contLen, &usedbReq);
SRpcMsg* pMsg = test.SendReq(TDMT_MND_USE_DB, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_USE_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; SUseDbRsp usedbRsp = {0};
EXPECT_STREQ(pRsp->db, "1.d2"); tDeserializeSUseDbRsp(pMsg->pCont, pMsg->contLen, &usedbRsp);
pRsp->uid = htobe64(pRsp->uid); EXPECT_STREQ(usedbRsp.db, "1.d2");
d2_uid = pRsp->uid; EXPECT_EQ(usedbRsp.vgVersion, 1);
pRsp->vgVersion = htonl(pRsp->vgVersion); EXPECT_EQ(usedbRsp.vgNum, 2);
pRsp->vgNum = htonl(pRsp->vgNum); EXPECT_EQ(usedbRsp.hashMethod, 1);
pRsp->hashMethod = pRsp->hashMethod; d2_uid = usedbRsp.uid;
EXPECT_EQ(pRsp->vgVersion, 1);
EXPECT_EQ(pRsp->vgNum, 2);
EXPECT_EQ(pRsp->hashMethod, 1);
{ {
SVgroupInfo* pInfo = &pRsp->vgroupInfo[0]; SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(usedbRsp.pVgroupInfos, 0);
pInfo->vgId = htonl(pInfo->vgId); pInfo->vgId = pInfo->vgId;
pInfo->hashBegin = htonl(pInfo->hashBegin); pInfo->hashBegin = pInfo->hashBegin;
pInfo->hashEnd = htonl(pInfo->hashEnd); pInfo->hashEnd = pInfo->hashEnd;
EXPECT_GT(pInfo->vgId, 0); EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, 0); EXPECT_EQ(pInfo->hashBegin, 0);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epset.inUse, 0);
EXPECT_EQ(pInfo->epset.numOfEps, 1); EXPECT_EQ(pInfo->epset.numOfEps, 1);
SEp* pAddr = &pInfo->epset.eps[0]; SEp* pAddr = &pInfo->epset.eps[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
{ {
SVgroupInfo* pInfo = &pRsp->vgroupInfo[1]; SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(usedbRsp.pVgroupInfos, 1);
pInfo->vgId = htonl(pInfo->vgId); pInfo->vgId = pInfo->vgId;
pInfo->hashBegin = htonl(pInfo->hashBegin); pInfo->hashBegin = pInfo->hashBegin;
pInfo->hashEnd = htonl(pInfo->hashEnd); pInfo->hashEnd = pInfo->hashEnd;
EXPECT_GT(pInfo->vgId, 0); EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2);
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
EXPECT_EQ(pInfo->epset.inUse, 0); EXPECT_EQ(pInfo->epset.inUse, 0);
EXPECT_EQ(pInfo->epset.numOfEps, 1); EXPECT_EQ(pInfo->epset.numOfEps, 1);
SEp* pAddr = &pInfo->epset.eps[0]; SEp* pAddr = &pInfo->epset.eps[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
tFreeSUsedbRsp(&usedbRsp);
} }
{ {
int32_t contLen = sizeof(SDropDbReq); SDropDbReq dropdbReq = {0};
strcpy(dropdbReq.db, "1.d2");
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq);
strcpy(pReq->db, "1.d2"); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDbReq(pReq, contLen, &dropdbReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; SDropDbRsp dropdbRsp = {0};
pDrop->uid = htobe64(pDrop->uid); tDeserializeSDropDbRsp(pRsp->pCont, pRsp->contLen, &dropdbRsp);
EXPECT_STREQ(pDrop->db, "1.d2"); EXPECT_STREQ(dropdbRsp.db, "1.d2");
EXPECT_EQ(pDrop->uid, d2_uid); EXPECT_EQ(dropdbRsp.uid, d2_uid);
} }
} }
...@@ -96,35 +96,35 @@ TEST_F(MndTestProfile, 03_ConnectMsg_Show) { ...@@ -96,35 +96,35 @@ TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
} }
TEST_F(MndTestProfile, 04_HeartBeatMsg) { TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchReq batchReq; SClientHbBatchReq batchReq = {0};
batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq)); batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq));
SClientHbReq req = {0}; SClientHbReq req = {0};
req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ};
req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
SKv kv; SKv kv = {0};
kv.key = 123; kv.key = 123;
kv.value = (void*)"bcd"; kv.value = (void*)"bcd";
kv.valueLen = 4; kv.valueLen = 4;
taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(req.info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
taosArrayPush(batchReq.reqs, &req); taosArrayPush(batchReq.reqs, &req);
int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq); int32_t tlen = tSerializeSClientHbBatchReq(NULL, 0, &batchReq);
void* buf = (SClientHbBatchReq*)rpcMallocCont(tlen);
void* buf = (SClientHbBatchReq*)rpcMallocCont(tlen); tSerializeSClientHbBatchReq(buf, tlen, &batchReq);
void* bufCopy = buf;
tSerializeSClientHbBatchReq(&bufCopy, &batchReq);
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, buf, tlen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, buf, tlen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
char* pRspChar = (char*)pMsg->pCont;
SClientHbBatchRsp rsp = {0}; SClientHbBatchRsp rsp = {0};
tDeserializeSClientHbBatchRsp(pRspChar, &rsp); tDeserializeSClientHbBatchRsp(pMsg->pCont, pMsg->contLen, &rsp);
int sz = taosArrayGetSize(rsp.rsps); int sz = taosArrayGetSize(rsp.rsps);
ASSERT_EQ(sz, 0); ASSERT_EQ(sz, 0);
//SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
//EXPECT_EQ(pRsp->connKey.connId, 123); // SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
//EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); // EXPECT_EQ(pRsp->connKey.connId, 123);
//EXPECT_EQ(pRsp->status, 0); // EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
// EXPECT_EQ(pRsp->status, 0);
#if 0 #if 0
int32_t contLen = sizeof(SHeartBeatReq); int32_t contLen = sizeof(SHeartBeatReq);
......
...@@ -101,7 +101,6 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { ...@@ -101,7 +101,6 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
rpcMsg.msgType = TDMT_MND_CREATE_DB; rpcMsg.msgType = TDMT_MND_CREATE_DB;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp); rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp);
ASSERT_EQ(rpcRsp.code, 0); ASSERT_EQ(rpcRsp.code, 0);
...@@ -252,39 +251,43 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) { ...@@ -252,39 +251,43 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
} }
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; // todo SUseDbRsp usedbRsp = {0};
strcpy(usedbRsp.db, ctgTestDbname);
pRsp->code = 0; usedbRsp.vgVersion = ctgTestVgVersion;
pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (SUseDbRsp *)pRsp->pCont;
strcpy(rspMsg->db, ctgTestDbname);
rspMsg->vgVersion = htonl(ctgTestVgVersion);
ctgTestCurrentVgVersion = ctgTestVgVersion; ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum); usedbRsp.vgNum = ctgTestVgNum;
rspMsg->hashMethod = 0; usedbRsp.hashMethod = 0;
rspMsg->uid = htobe64(ctgTestDbId); usedbRsp.uid = ctgTestDbId;
usedbRsp.pVgroupInfos = taosArrayInit(usedbRsp.vgNum, sizeof(SVgroupInfo));
SVgroupInfo *vg = NULL; uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
for (int32_t i = 0; i < ctgTestVgNum; ++i) { for (int32_t i = 0; i < ctgTestVgNum; ++i) {
vg = &rspMsg->vgroupInfo[i]; SVgroupInfo vg = {0};
vg.vgId = i + 1;
vg->vgId = htonl(i + 1); vg.hashBegin = i * hashUnit;
vg->hashBegin = htonl(i * hashUnit); vg.hashEnd = hashUnit * (i + 1) - 1;
vg->hashEnd = htonl(hashUnit * (i + 1) - 1); if (i == ctgTestVgNum - 1) {
vg->epset.numOfEps = i % TSDB_MAX_REPLICA + 1; vg.hashEnd = htonl(UINT32_MAX);
vg->epset.inUse = i % vg->epset.numOfEps; }
for (int32_t n = 0; n < vg->epset.numOfEps; ++n) {
SEp *addr = &vg->epset.eps[n]; vg.epset.numOfEps = i % TSDB_MAX_REPLICA + 1;
vg.epset.inUse = i % vg.epset.numOfEps;
for (int32_t n = 0; n < vg.epset.numOfEps; ++n) {
SEp *addr = &vg.epset.eps[n];
strcpy(addr->fqdn, "a0"); strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22); addr->port = n + 22;
} }
taosArrayPush(usedbRsp.pVgroupInfos, &vg);
} }
vg->hashEnd = htonl(UINT32_MAX); int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp);
void *pReq = rpcMallocCont(contLen);
tSerializeSUseDbRsp(pReq, contLen, &usedbRsp);
return; pRsp->code = 0;
pRsp->contLen = contLen;
pRsp->pCont = pReq;
} }
void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
......
...@@ -83,78 +83,63 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms ...@@ -83,78 +83,63 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) { if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
SUseDbRsp *pRsp = (SUseDbRsp *)msg;
SUseDbOutput *pOut = (SUseDbOutput *)output; SUseDbOutput *pOut = (SUseDbOutput *)output;
int32_t code = 0; int32_t code = 0;
if (msgSize <= sizeof(*pRsp)) { SUseDbRsp usedbRsp = {0};
qError("invalid use db rsp msg size, msgSize:%d", msgSize); if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) {
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; qError("invalid use db rsp msg, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
} }
pRsp->vgVersion = ntohl(pRsp->vgVersion);
pRsp->vgNum = ntohl(pRsp->vgNum);
pRsp->uid = be64toh(pRsp->uid);
if (pRsp->vgNum < 0) { if (usedbRsp.vgNum < 0) {
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
int32_t expectSize = pRsp->vgNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp);
if (msgSize != expectSize) {
qError("use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d", msgSize, expectSize, pRsp->vgNum);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo)); pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
if (NULL == pOut->dbVgroup) { if (NULL == pOut->dbVgroup) {
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo)); qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pOut->dbId = pRsp->uid; pOut->dbId = usedbRsp.uid;
pOut->dbVgroup->vgVersion = pRsp->vgVersion; pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
pOut->dbVgroup->hashMethod = pRsp->hashMethod; pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); pOut->dbVgroup->vgHash =
taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup->vgHash) { if (NULL == pOut->dbVgroup->vgHash) {
qError("taosHashInit %d failed", pRsp->vgNum); qError("taosHashInit %d failed", usedbRsp.vgNum);
tfree(pOut->dbVgroup); tfree(pOut->dbVgroup);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < pRsp->vgNum; ++i) { for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
pRsp->vgroupInfo[i].vgId = ntohl(pRsp->vgroupInfo[i].vgId); SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin);
pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < pRsp->vgroupInfo[i].epset.numOfEps; ++n) { if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
qError("taosHashPut failed"); qError("taosHashPut failed");
goto _return; goto _return;
} }
} }
memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
return code; return code;
_return: _return:
taosArrayDestroy(usedbRsp.pVgroupInfos);
if (pOut) { if (pOut) {
taosHashCleanup(pOut->dbVgroup->vgHash); taosHashCleanup(pOut->dbVgroup->vgHash);
tfree(pOut->dbVgroup); tfree(pOut->dbVgroup);
} }
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册