diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 75e819521534d6b30758c0b26374b7adb1ecdabc..97ba56bd76aacc9c7b3b42214006f19808c5f41d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -76,6 +76,12 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; +typedef enum { + HEARTBEAT_KEY_DBINFO = 1, + HEARTBEAT_KEY_STBINFO, +}; + + typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, @@ -1335,9 +1341,8 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat } typedef struct { - int32_t keyLen; + int32_t key; int32_t valueLen; - void* key; void* value; } SKv; @@ -1359,8 +1364,7 @@ typedef struct { typedef struct { SClientHbKey connKey; int32_t status; - int32_t bodyLen; - void* body; + SArray* info; // Array } SClientHbRsp; typedef struct { @@ -1402,17 +1406,15 @@ void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->key); tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->key); buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 53ef6f4f9ba8088a5d83a2b52ef2f3fefa4259f2..6498abb7fc6dc816abbc54567b58293c82d2f1ea 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -94,7 +94,7 @@ typedef struct SUseDbOutput { } SUseDbOutput; enum { - META_TYPE_NON_TABLE = 1, + META_TYPE_NULL_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, META_TYPE_BOTH_TABLE @@ -174,7 +174,7 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSi extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); -#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE +#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c61f3da6bd456058e9754918be10907486dd7b77..a25d79bf30ccb8e75a803848851c845e5e88adf4 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -37,7 +37,15 @@ typedef struct SAppInstInfo SAppInstInfo; typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req); + +typedef struct SHbConnInfo { + void *param; + SClientHbReq *req; +} SHbConnInfo; + typedef struct SAppHbMgr { + char *key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -49,7 +57,7 @@ typedef struct SAppHbMgr { SAppInstInfo* pAppInstInfo; // info SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash + SHashObj* connInfo; // hash } SAppHbMgr; typedef struct SClientHbMgr { @@ -59,12 +67,10 @@ typedef struct SClientHbMgr { pthread_t thread; pthread_mutex_t lock; // used when app init and cleanup SArray* appHbMgrs; // SArray one for each cluster - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; + FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; + FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; -// TODO: embed param into function -// return type: SArray -typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SQueryExecMetric { int64_t start; // start timestamp diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0f4ff6f72571a31eccaac783f6117baa7761f3bd..8fc9fd086ce3c2443d2c535558237424b7235c3d 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -21,27 +21,136 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(SClientHbRsp* pRsp) { +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { return 0; } +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); + if (NULL == info) { + tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); + return TSDB_CODE_SUCCESS; + } + + int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; + for (int32_t i = 0; i < kvNum; ++i) { + SKv *kv = taosArrayGet(pRsp->info, i); + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: + + break; + case HEARTBEAT_KEY_STBINFO: + + break; + default: + tscError("invalid hb key type:%d", kv->key); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { if (code != 0) { return -1; } - SClientHbRsp* pRsp = (SClientHbRsp*) pMsg->pData; - return hbMqHbRspHandle(pRsp); + char *key = (char *)param; + SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData; + int32_t reqNum = taosArrayGetSize(pRsp->rsps); + + SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + if (pInst == NULL || NULL == *pInst) { + tscError("cluster not exist, key:%s", key); + return -1; + } + + for (int32_t i = 0; i < reqNum; ++i) { + SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i); + code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); + if (code) { + break; + } + } + + return code; } -void hbMgrInitMqHbRspHandle() { - clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { + SDbVgVersion *dbs = NULL; + uint32_t dbNum = 0; + int32_t code = 0; + + code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + for (int32_t i = 0; i < dbNum; ++i) { + SDbVgVersion *db = &dbs[i]; + db->dbId = htobe64(db->dbId); + db->vgVersion = htonl(db->vgVersion); + } + + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + + return TSDB_CODE_SUCCESS; +} + +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; + struct SCatalog *pCatalog = NULL; + + int32_t code = catalogGetHandle(*clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + return code; + } + + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + + return TSDB_CODE_SUCCESS; +} + +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { + +} + +void hbMgrInitMqHbHandle() { + clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; + clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; + clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; } static FORCE_INLINE void hbMgrInitHandle() { // init all handle - hbMgrInitMqHbRspHandle(); + hbMgrInitMqHbHandle(); } + +void hbFreeReqKvHash(SHashObj* info) { + void *pIter = taosHashIterate(info, NULL); + while (pIter != NULL) { + SKv* kv = pIter; + + tfree(kv->value); + + pIter = taosHashIterate(info, pIter); + } +} + +void hbFreeReq(SClientHbReq *req) { + hbFreeReqKvHash(req->info); +} + + + SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { @@ -51,30 +160,48 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + int32_t code = 0; void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { SClientHbReq* pOneReq = pIter; + + SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + if (info) { + code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); + if (code) { + taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); + break; + } + } + taosArrayPush(pBatchReq->reqs, pOneReq); - taosHashClear(pOneReq->info); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } -#if 0 - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); - while (pIter != NULL) { - FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; - SClientHbKey connKey; - taosHashCopyKey(pIter, &connKey); - SArray* pArray = getConnInfoFp(connKey, NULL); - - pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); + if (code) { + taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); + tfree(pBatchReq); } -#endif return pBatchReq; } + +void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq* pOneReq = pIter; + + hbFreeReqKvHash(pOneReq->info); + taosHashClear(pOneReq->info); + + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + } +} + + + static void* hbThreadFunc(void* param) { setThreadName("hb"); while (1) { @@ -98,7 +225,9 @@ static void* hbThreadFunc(void* param) { int tlen = tSerializeSClientHbBatchReq(NULL, pReq); void *buf = malloc(tlen); if (buf == NULL) { - //TODO: error handling + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); break; } void *abuf = buf; @@ -107,6 +236,7 @@ static void* hbThreadFunc(void* param) { if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); free(buf); break; } @@ -114,7 +244,7 @@ static void* hbThreadFunc(void* param) { pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; - pInfo->param = NULL; + pInfo->param = strdup(pAppHbMgr->key); pInfo->requestId = generateRequestId(); pInfo->requestObjRefId = 0; @@ -148,7 +278,7 @@ static void hbStopThread() { atomic_store_8(&clientHbMgr.threadStop, 1); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { +SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { hbMgrInit(); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -160,6 +290,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { pAppHbMgr->connKeyCnt = 0; pAppHbMgr->reportCnt = 0; pAppHbMgr->reportBytes = 0; + pAppHbMgr->key = strdup(key); // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; @@ -243,7 +374,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) { return 0; } -int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { +int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { @@ -252,16 +383,42 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash - if (func != NULL) { - taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + if (info != NULL) { + SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + info->req = pReq; + taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; + SHbConnInfo info = {0}; + + switch (hbType) { + case HEARTBEAT_TYPE_QUERY: { + int64_t *pClusterId = malloc(sizeof(int64_t)); + *pClusterId = clusterId; + + info.param = pClusterId; + break; + } + case HEARTBEAT_TYPE_MQ: { + break; + } + default: + break; + } + + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); +} + void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6014042e11cb7ce25b81eb671f8a7ad7eb379252..a39a0d637bfc948af6d1f29db395e1af99f9bdf1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb0735824f3348287ec54fd476f00dc7adb..831006ac8933ae96890122b29bc414c164a51ae0 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -71,8 +71,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 48e9dce3c14eafb4b4c81d17d445767236224785..1e9131c8005d1c5ed962ba064af38d169a177bba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -91,13 +91,11 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int32_t kvNum = taosHashGetSize(pReq->info); tlen += taosEncodeFixedI32(buf, kvNum); - SKv kv; + SKv *kv; void* pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { - taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); - kv.valueLen = taosHashGetDataLen(pIter); - kv.value = pIter; - tlen += taosEncodeSKv(buf, &kv); + kv = pIter; + tlen += taosEncodeSKv(buf, kv); pIter = taosHashIterate(pReq->info, pIter); } @@ -116,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); + taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen); } return buf; @@ -124,17 +122,28 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp) { int tlen = 0; + int32_t kvNum = taosArrayGetSize(pRsp->info); tlen += taosEncodeSClientHbKey(buf, &pRsp->connKey); tlen += taosEncodeFixedI32(buf, pRsp->status); - tlen += taosEncodeFixedI32(buf, pRsp->bodyLen); - tlen += taosEncodeBinary(buf, pRsp->body, pRsp->bodyLen); + tlen += taosEncodeFixedI32(buf, kvNum); + for (int i = 0; i < kvNum; i++) { + SKv *kv = (SKv *)taosArrayGet(pRsp->info, i); + tlen += taosEncodeSKv(buf, kv); + } return tlen; } void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp) { + int32_t kvNum = 0; buf = taosDecodeSClientHbKey(buf, &pRsp->connKey); buf = taosDecodeFixedI32(buf, &pRsp->status); - buf = taosDecodeFixedI32(buf, &pRsp->bodyLen); - buf = taosDecodeBinary(buf, &pRsp->body, pRsp->bodyLen); + buf = taosDecodeFixedI32(buf, &kvNum); + pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); + for (int i = 0; i < kvNum; i++) { + SKv kv = {0}; + buf = taosDecodeSKv(buf, &kv); + taosArrayPush(pRsp->info, &kv); + } + return buf; } @@ -155,6 +164,7 @@ void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pBatchReq) { if (pBatchReq->reqs == NULL) { pBatchReq->reqs = taosArrayInit(0, sizeof(SClientHbReq)); } + int32_t reqNum; buf = taosDecodeFixedI32(buf, &reqNum); for (int i = 0; i < reqNum; i++) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 39be41a4e50436e6734aefb32fb3da5861025963..5917093c7b978be7490b568d5e79bdcdcf5e4010 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -805,6 +805,44 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SVgroupInfo *vgList, int32_t *vgNum) { + int32_t vindex = 0; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (vindex < pDb->cfg.numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SVgroupInfo *pInfo = &vgList[vindex]; + pInfo->vgId = htonl(pVgroup->vgId); + pInfo->hashBegin = htonl(pVgroup->hashBegin); + pInfo->hashEnd = htonl(pVgroup->hashEnd); + pInfo->numOfEps = pVgroup->replica; + for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; + SEpAddr *pEpArrr = &pInfo->epAddr[gid]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode != NULL) { + memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEpArrr->port = htons(pDnode->port); + } + mndReleaseDnode(pMnode, pDnode); + if (pVgid->role == TAOS_SYNC_STATE_LEADER) { + pInfo->inUse = gid; + } + } + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + + *vgNum = vindex; +} + static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -826,45 +864,16 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return -1; } - int32_t vindex = 0; + int32_t vgNum = 0; if (pUse->vgVersion < pDb->vgVersion) { - void *pIter = NULL; - while (vindex < pDb->cfg.numOfVgroups) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex]; - pInfo->vgId = htonl(pVgroup->vgId); - pInfo->hashBegin = htonl(pVgroup->hashBegin); - pInfo->hashEnd = htonl(pVgroup->hashEnd); - pInfo->numOfEps = pVgroup->replica; - for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { - SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEpAddr *pEpArrr = &pInfo->epAddr[gid]; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode != NULL) { - memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEpArrr->port = htons(pDnode->port); - } - mndReleaseDnode(pMnode, pDnode); - if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - pInfo->inUse = gid; - } - } - vindex++; - } - - sdbRelease(pSdb, pVgroup); - } + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); } memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); pRsp->uid = htobe64(pDb->uid); pRsp->vgVersion = htonl(pDb->vgVersion); - pRsp->vgNum = htonl(vindex); + pRsp->vgNum = htonl(vgNum); pRsp->hashMethod = pDb->hashMethod; pReq->pCont = pRsp; @@ -874,6 +883,72 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { return 0; } +int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen) { + SSdb *pSdb = pMnode->pSdb; + int32_t bufSize = num * (sizeof(SUseDbRsp) + TSDB_DEFAULT_VN_PER_DB * sizeof(SVgroupInfo)); + void *buf = malloc(bufSize); + int32_t len = 0; + int32_t contLen = 0; + int32_t bufOffset = 0; + SUseDbRsp *pRsp = NULL; + + for (int32_t i = 0; i < num; ++i) { + SDbVgVersion *db = &dbs[i]; + + len = 0; + + SDbObj *pDb = mndAcquireDb(pMnode, db->dbName); + if (pDb == NULL) { + mInfo("db %s not exist", db->dbName); + + len = sizeof(SUseDbRsp); + } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { + len = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); + } + + if (0 == len) { + mndReleaseDb(pMnode, pDb); + + continue; + } + + contLen += len; + + if (contLen > bufSize) { + buf = realloc(buf, contLen); + } + + pRsp = (SUseDbRsp *)((char *)buf + bufOffset); + memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN); + if (pDb) { + int32_t vgNum = 0; + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); + + pRsp->uid = htobe64(pDb->uid); + pRsp->vgVersion = htonl(pDb->vgVersion); + pRsp->vgNum = htonl(vgNum); + pRsp->hashMethod = pDb->hashMethod; + } else { + pRsp->vgVersion = htonl(-1); + } + + bufOffset += len; + + mndReleaseDb(pMnode, pDb); + } + + if (contLen > 0) { + *rsp = buf; + *rspLen = contLen; + } else { + *rsp = NULL; + tfree(buf); + *rspLen = 0; + } + + return 0; +} + static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SSyncDbReq *pSync = pReq->rpcMsg.pCont; @@ -1166,4 +1241,4 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 76fabc96ceac4a8b89cebc7036d651914bfa06a0..8296cd2c6c90afb24b9167f7f3c0da1b02b34bb7 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -346,12 +346,46 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; + batchRsp.key = batchReq.key; + batchRsp.keyLen = batchReq.keyLen; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { SClientHbReq* pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + int32_t kvNum = taosHashGetSize(pHbReq->info); + if (NULL == pHbReq->info || kvNum <= 0) { + continue; + } + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))}; + + void *pIter = taosHashIterate(pHbReq->info, NULL); + while (pIter != NULL) { + SKv* kv = pIter; + + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp->info, &kv); + + taosArrayPush(batchRsp.rsps, &hbRsp); + } + break; + case HEARTBEAT_KEY_STBINFO: + + break; + default: + mError("invalid kv key:%d", kv->key); + break; + } + + pIter = taosHashIterate(pHbReq->info, pIter); + } } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { @@ -366,6 +400,19 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + + int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); + for (int32_t i = 0; i < rspNum; ++i) { + SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i); + int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info): 0; + for (int32_t n = 0; n < kvNum; ++n) { + SKv *kv = taosArrayGet(rsp->info, n); + tfree(kv->value); + } + taosArrayDestroy(rsp->info); + } + + tfree(batchRsp.key); taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9f1ea754c28efa25e1e4bc0f374dee90e2595363..a96b12f597508843ebb977645e721e6178323425 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -115,7 +115,7 @@ typedef struct SCatalogMgmt { typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); -#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE) +#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE) #define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE) #define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE) #define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index d052e045524977c36511fba9e2c98f16b831d65d..3ce71ccb440054c7c7d3c3fccf5c82d8f9aa176f 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -253,7 +253,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName); return TSDB_CODE_SUCCESS; } @@ -315,7 +315,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { - SET_META_TYPE_NONE(output->metaType); + SET_META_TYPE_NULL(output->metaType); ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } @@ -510,14 +510,14 @@ int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t s CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (slot->needSort) { taosArraySort(slot->meta, compare); slot->needSort = false; - qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type); + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); } void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); @@ -542,6 +542,42 @@ _return: CTG_RET(code); } +int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) { + int16_t widx = abs(id % mgmt->slotNum); + + SRentSlotInfo *slot = &mgmt->slots[widx]; + int32_t code = 0; + + CTG_LOCK(CTG_WRITE, &slot->lock); + if (NULL == slot->meta) { + qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (slot->needSort) { + taosArraySort(slot->meta, compare); + slot->needSort = false; + qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + } + + int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ); + if (idx < 0) { + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + taosArrayRemove(slot->meta, idx); + + qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + +_return: + + CTG_UNLOCK(CTG_WRITE, &slot->lock); + + CTG_RET(code); +} + + int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); if (ridx >= mgmt->slotNum) { @@ -763,44 +799,49 @@ int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) { - SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); - if (info) { - CTG_LOCK(CTG_WRITE, &info->lock); - if (info->dbId != target->dbId) { - ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - - return TSDB_CODE_SUCCESS; - } - - if (info->vgVersion > target->vgVersion) { - ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - - return TSDB_CODE_SUCCESS; - } - - if (info->vgInfo) { - ctgInfo("cleanup db vgInfo, db:%s", target->dbName); - taosHashCleanup(info->vgInfo); - info->vgInfo = NULL; - } +int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) { + *removed = false; - if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { - ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - + SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); + if (NULL == info) { + ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName); + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_WRITE, &info->lock); + if (info->dbId != target->dbId) { + ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + return TSDB_CODE_SUCCESS; + } + + if (info->vgVersion > target->vgVersion) { + ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + return TSDB_CODE_SUCCESS; + } + if (info->vgInfo) { + ctgInfo("cleanup db vgInfo, db:%s", target->dbName); + taosHashCleanup(info->vgInfo); + info->vgInfo = NULL; + } + + if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { + ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); + CTG_UNLOCK(CTG_WRITE, &info->lock); taosHashRelease(pCatalog->dbCache.cache, info); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + + CTG_UNLOCK(CTG_WRITE, &info->lock); + + taosHashRelease(pCatalog->dbCache.cache, info); + *removed = true; + return TSDB_CODE_SUCCESS; } @@ -825,7 +866,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { + if (CTG_IS_META_NULL(moutput.metaType)) { CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); } else { output = &moutput; @@ -841,6 +882,8 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); + voutput.metaType = moutput.metaType; + tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; @@ -850,20 +893,22 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con if (0 == exist) { CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); - if (CTG_IS_META_NONE(moutput.metaType)) { - SET_META_TYPE_NONE(voutput.metaType); + if (CTG_IS_META_NULL(moutput.metaType)) { + SET_META_TYPE_NULL(voutput.metaType); } tfree(voutput.tbMeta); voutput.tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; } else { + tfree(voutput.tbMeta); + SET_META_TYPE_CTABLE(voutput.metaType); } } } - if (CTG_IS_META_NONE(output->metaType)) { + if (CTG_IS_META_NULL(output->metaType)) { ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1221,43 +1266,26 @@ _return: int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { int32_t code = 0; + bool removed = false; if (NULL == pCatalog || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } - if (pCatalog->dbCache.cache) { - CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo)); - - CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); + if (NULL == pCatalog->dbCache.cache) { + return TSDB_CODE_SUCCESS; } - ctgWarn("db removed from cache, db:%s", dbName); - - bool newAdded = false; - if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { - ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - dbInfo->vgInfo = NULL; - - SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed)); + if (!removed) { + return TSDB_CODE_SUCCESS; } - ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); - + ctgInfo("db removed from cache, db:%s", dbInfo->dbName); -_return: - - if (dbInfo && dbInfo->vgInfo) { - taosHashCleanup(dbInfo->vgInfo); - dbInfo->vgInfo = NULL; - } + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, db:%s", dbInfo->dbName); CTG_RET(code); } @@ -1365,6 +1393,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup)); _return: + if (dbInfo) { CTG_UNLOCK(CTG_READ, &dbInfo->lock); taosHashRelease(pCatalog->dbCache.cache, dbInfo);