diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 97ba56bd76aacc9c7b3b42214006f19808c5f41d..1316847f485006f974431ae1e673c0761f41866b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -76,9 +76,10 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; -typedef enum { +enum { HEARTBEAT_KEY_DBINFO = 1, HEARTBEAT_KEY_STBINFO, + HEARTBEAT_KEY_MQ_TMP, }; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 9c9e370dcaae1dffb64b29c36a002216d6ffe559..0b465b7b4e01f65d41fda565ca43f16c1e9b7853 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -99,6 +99,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo); + /** * Get a table's meta data. * @param pCatalog (input, got with catalogGetHandle) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a25d79bf30ccb8e75a803848851c845e5e88adf4..b346e54addabaa0ed2e478127b6b3359c1434557 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -35,10 +35,6 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); - -typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); - typedef struct SHbConnInfo { void *param; SClientHbReq *req; @@ -60,6 +56,12 @@ typedef struct SAppHbMgr { SHashObj* connInfo; // hash } SAppHbMgr; + +typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp); + +typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); + + typedef struct SClientHbMgr { int8_t inited; // ctl @@ -223,11 +225,11 @@ void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo); +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key); void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 8fc9fd086ce3c2443d2c535558237424b7235c3d..0ab3089f13cf619acccc820236c9b8534cffbb43 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,6 +15,8 @@ #include "clientInt.h" #include "trpc.h" +#include "catalog.h" +#include "clientLog.h" static SClientHbMgr clientHbMgr = {0}; @@ -25,6 +27,67 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) return 0; } +static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { + int32_t msgLen = 0; + int32_t code = 0; + + while (msgLen < valueLen) { + SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen); + + rsp->vgVersion = ntohl(rsp->vgVersion); + rsp->vgNum = ntohl(rsp->vgNum); + rsp->uid = be64toh(rsp->uid); + + if (rsp->vgVersion < 0) { + SDbVgVersion dbInfo; + strcpy(dbInfo.dbName, rsp->db); + dbInfo.dbId = rsp->uid; + dbInfo.vgVersion = rsp->vgVersion; + + code = catalogRemoveDBVgroup(pCatalog, &dbInfo); + } else { + SDBVgroupInfo vgInfo = {0}; + vgInfo.dbId = rsp->uid; + vgInfo.vgVersion = rsp->vgVersion; + vgInfo.hashMethod = rsp->hashMethod; + vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (NULL == vgInfo.vgInfo) { + tscError("hash init[%d] failed", rsp->vgNum); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < rsp->vgNum; ++i) { + rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); + rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); + rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); + + for (int32_t n = 0; n < rsp->vgroupInfo[i].numOfEps; ++n) { + rsp->vgroupInfo[i].epAddr[n].port = ntohs(rsp->vgroupInfo[i].epAddr[n].port); + } + + if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { + tscError("hash push failed, errno:%d", errno); + taosHashCleanup(vgInfo.vgInfo); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + if (code) { + taosHashCleanup(vgInfo.vgInfo); + } + } + + if (code) { + return code; + } + + msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { @@ -36,9 +99,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: - + case HEARTBEAT_KEY_DBINFO: { + if (kv->valueLen <= 0 || NULL == kv->value) { + tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); + break; + } + + int64_t *clusterId = (int64_t *)info->param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + break; + } + + hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; + } case HEARTBEAT_KEY_STBINFO: break; @@ -56,8 +134,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code return -1; } char *key = (char *)param; - SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData; - int32_t reqNum = taosArrayGetSize(pRsp->rsps); + SClientHbBatchRsp pRsp = {0}; + tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); + + int32_t reqNum = taosArrayGetSize(pRsp.rsps); SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { @@ -66,7 +146,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code } for (int32_t i = 0; i < reqNum; ++i) { - SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i); + SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; @@ -145,8 +225,9 @@ void hbFreeReqKvHash(SHashObj* info) { } } -void hbFreeReq(SClientHbReq *req) { - hbFreeReqKvHash(req->info); +void hbFreeReq(void *req) { + SClientHbReq *pReq = (SClientHbReq *)req; + hbFreeReqKvHash(pReq->info); } @@ -305,9 +386,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { } pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; // init getInfoFunc - pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - if (pAppHbMgr->getInfoFuncs == NULL) { + if (pAppHbMgr->connInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; free(pAppHbMgr); return NULL; @@ -325,7 +406,7 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == pTarget) { taosHashCleanup(pTarget->activeInfo); - taosHashCleanup(pTarget->getInfoFuncs); + taosHashCleanup(pTarget->connInfo); } } @@ -357,23 +438,6 @@ void hbMgrCleanUp() { taosArrayDestroy(clientHbMgr.appHbMgrs); } -int hbHandleRsp(SClientHbBatchRsp* hbRsp) { - int64_t reqId = hbRsp->reqId; - int64_t rspId = hbRsp->rspId; - - SArray* rsps = hbRsp->rsps; - int32_t sz = taosArrayGetSize(rsps); - for (int i = 0; i < sz; i++) { - SClientHbRsp* pRsp = taosArrayGet(rsps, i); - if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { - clientHbMgr.handle[pRsp->connKey.hbType](pRsp); - } else { - // discard rsp - } - } - return 0; -} - int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); @@ -421,7 +485,7 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a39a0d637bfc948af6d1f29db395e1af99f9bdf1..ce0e5949557ec585240b52f146e8eb3fbd9b1000 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -434,13 +434,8 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { return NULL; } SKv kv = {0}; - kv.key = malloc(256); - if (kv.key == NULL) { - taosArrayDestroy(pArray); - return NULL; - } - strcpy(kv.key, "mq-tmp"); - kv.keyLen = strlen("mq-tmp") + 1; + kv.key = HEARTBEAT_KEY_MQ_TMP; + SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); if (pMqHb == NULL) { return pArray; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1e9131c8005d1c5ed962ba064af38d169a177bba..0d1576d03c26081b2b8852ecbfc4ada55e17d8fe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen); + taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); } return buf; diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 91f502be7d94e29c410834bedf8a10eddf76783b..5a3e6ed26e4b65875c74a00024f3cbdd3319db16 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -26,6 +26,7 @@ int32_t mndInitDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode); SDbObj *mndAcquireDb(SMnode *pMnode, char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 8296cd2c6c90afb24b9167f7f3c0da1b02b34bb7..49265fa35ebe2674744468c4fab03f8db8ec8e46 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -346,8 +346,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; - batchRsp.key = batchReq.key; - batchRsp.keyLen = batchReq.keyLen; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { @@ -365,17 +363,18 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { SKv* kv = pIter; switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: + case HEARTBEAT_KEY_DBINFO: { void *rspMsg = NULL; int32_t rspLen = 0; mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp->info, &kv); + taosArrayPush(hbRsp.info, &kv); taosArrayPush(batchRsp.rsps, &hbRsp); } break; + } case HEARTBEAT_KEY_STBINFO: break; @@ -412,7 +411,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { taosArrayDestroy(rsp->info); } - tfree(batchRsp.key); taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 4ad979cdd33a8e4d128737eb8697f47314328c41..991b4c1249ab7e390817cd7bdf18011efe99b137 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -102,11 +102,10 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); SKv kv; - kv.key = (void*)"abc"; - kv.keyLen = 4; + kv.key = 123; kv.value = (void*)"bcd"; kv.valueLen = 4; - taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(req.info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); taosArrayPush(batchReq.reqs, &req); int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3ce71ccb440054c7c7d3c3fccf5c82d8f9aa176f..07086e49ee6e50141bb5510c8661c64893ee0f53 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1236,6 +1236,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } bool newAdded = false; + + dbInfo->lock = 0; if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); @@ -1269,7 +1271,7 @@ int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { bool removed = false; if (NULL == pCatalog || NULL == dbInfo) { - CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == pCatalog->dbCache.cache) {