From 78e49d6b08d3b8c72c726c141d835e1a4d6501a0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 26 Jan 2022 17:42:31 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 38 ++++++++++++++++- source/client/src/clientHb.c | 51 +++++++++++++++-------- source/client/src/clientMsgHandler.c | 14 ++++++- source/common/src/tmsg.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 5 +++ source/dnode/mnode/impl/src/mndProfile.c | 5 ++- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/catalog.c | 23 +++++----- source/libs/executor/src/dataDispatcher.c | 1 + source/util/src/terror.c | 2 +- 10 files changed, 107 insertions(+), 36 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 210804143b..8e62b4d0db 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1391,9 +1391,26 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); + +static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { + void *pIter = taosHashIterate(info, NULL); + while (pIter != NULL) { + SKv* kv = (SKv*)pIter; + + tfree(kv->value); + + pIter = taosHashIterate(info, pIter); + } +} + + static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - if (req->info) taosHashCleanup(req->info); + if (req->info) { + tFreeReqKvHash(req->info); + + taosHashCleanup(req->info); + } } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); @@ -1409,6 +1426,25 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { free(pReq); } +static FORCE_INLINE void tFreeClientKv(void *pKv) { + SKv *kv = (SKv *)pKv; + if (kv) { + tfree(kv->value); + } +} + +static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { + SClientHbRsp* rsp = (SClientHbRsp*)pRsp; + if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); +} + + +static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { + SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; + taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); +} + + int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0ab3089f13..4cb2cd2aae 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -37,6 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog rsp->vgVersion = ntohl(rsp->vgVersion); rsp->vgNum = ntohl(rsp->vgNum); rsp->uid = be64toh(rsp->uid); + + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); if (rsp->vgVersion < 0) { SDbVgVersion dbInfo; @@ -96,6 +98,9 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs } int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; + + tscDebug("hb got %d rsp kv", kvNum); + for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { @@ -130,28 +135,42 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs } static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { + static int32_t emptyRspNum = 0; if (code != 0) { + tfree(param); return -1; } char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp); - int32_t reqNum = taosArrayGetSize(pRsp.rsps); + int32_t rspNum = taosArrayGetSize(pRsp.rsps); SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { - tscError("cluster not exist, key:%s", key); + tscError("cluster not exist, key:%s", key); + tfree(param); + tFreeClientHbBatchRsp(&pRsp); return -1; } - for (int32_t i = 0; i < reqNum; ++i) { + tfree(param); + + if (rspNum) { + tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + } else { + atomic_add_fetch_32(&emptyRspNum, 1); + } + + for (int32_t i = 0; i < rspNum; ++i) { SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; } } + + tFreeClientHbBatchRsp(&pRsp); return code; } @@ -166,6 +185,10 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl return code; } + if (dbNum <= 0) { + return TSDB_CODE_SUCCESS; + } + for (int32_t i = 0; i < dbNum; ++i) { SDbVgVersion *db = &dbs[i]; db->dbId = htobe64(db->dbId); @@ -173,6 +196,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl } SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + + tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); return TSDB_CODE_SUCCESS; @@ -213,21 +239,9 @@ static FORCE_INLINE void hbMgrInitHandle() { hbMgrInitMqHbHandle(); } - -void hbFreeReqKvHash(SHashObj* info) { - void *pIter = taosHashIterate(info, NULL); - while (pIter != NULL) { - SKv* kv = pIter; - - tfree(kv->value); - - pIter = taosHashIterate(info, pIter); - } -} - void hbFreeReq(void *req) { SClientHbReq *pReq = (SClientHbReq *)req; - hbFreeReqKvHash(pReq->info); + tFreeReqKvHash(pReq->info); } @@ -274,7 +288,7 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { while (pIter != NULL) { SClientHbReq* pOneReq = pIter; - hbFreeReqKvHash(pOneReq->info); + tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); @@ -333,7 +347,8 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); + hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 831006ac89..7b4b3353fb 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -18,6 +18,7 @@ #include "tname.h" #include "clientInt.h" #include "clientLog.h" +#include "catalog.h" int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -287,7 +288,6 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { - // todo: Remove cache in catalog cache. SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); @@ -295,6 +295,18 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } + SDropDbReq *req = pRequest->body.requestMsg.pData; + + SDbVgVersion dbVer = {0}; + struct SCatalog *pCatalog = NULL; + + strncpy(dbVer.dbName, req->db, sizeof(dbVer.dbName)); + dbVer.dbId = 0; //TODO GET DBID FROM RSP + + catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + + catalogRemoveDBVgroup(pCatalog, &dbVer); + tsem_post(&pRequest->body.rspSem); return code; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0d1576d03c..34b55fd812 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) { for(int i = 0; i < kvNum; i++) { SKv kv; buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen); + taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); } return buf; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 5917093c7b..94dff2e559 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -894,6 +894,8 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * for (int32_t i = 0; i < num; ++i) { SDbVgVersion *db = &dbs[i]; + db->dbId = be64toh(db->dbId); + db->vgVersion = ntohl(db->vgVersion); len = 0; @@ -929,6 +931,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * pRsp->vgNum = htonl(vgNum); pRsp->hashMethod = pDb->hashMethod; } else { + pRsp->uid = htobe64(db->dbId); + pRsp->vgNum = htonl(0); + pRsp->hashMethod = 0; pRsp->vgVersion = htonl(-1); } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 49265fa35e..005f9a3d3c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -370,8 +370,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { if (rspMsg && rspLen > 0) { SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; taosArrayPush(hbRsp.info, &kv); - - taosArrayPush(batchRsp.rsps, &hbRsp); } break; } @@ -380,11 +378,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { break; default: mError("invalid kv key:%d", kv->key); + hbRsp.status = TSDB_CODE_MND_APP_ERROR; break; } pIter = taosHashIterate(pHbReq->info, pIter); } + + taosArrayPush(batchRsp.rsps, &hbRsp); } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a96b12f597..7128e143bf 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,7 +31,7 @@ extern "C" { #define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10 -#define CTG_RENT_SLOT_SECOND 2 +#define CTG_RENT_SLOT_SECOND 1.5 #define CTG_DEFAULT_INVALID_VERSION (-1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 07086e49ee..6b68ee87e5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -443,9 +443,9 @@ int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { } int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { - if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) { return -1; - } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) { return 1; } else { return 0; @@ -652,7 +652,7 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { int32_t code = 0; - if (NULL == output->tbMeta) { + if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -809,20 +809,19 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ } CTG_LOCK(CTG_WRITE, &info->lock); + + //TODO OPEN IT +#if 0 if (info->dbId != target->dbId) { ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); CTG_UNLOCK(CTG_WRITE, &info->lock); taosHashRelease(pCatalog->dbCache.cache, info); return TSDB_CODE_SUCCESS; } - - if (info->vgVersion > target->vgVersion) { - ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); - CTG_UNLOCK(CTG_WRITE, &info->lock); - taosHashRelease(pCatalog->dbCache.cache, info); - return TSDB_CODE_SUCCESS; - } - +#else + target->dbId = info->dbId; +#endif + if (info->vgInfo) { ctgInfo("cleanup db vgInfo, db:%s", target->dbName); taosHashCleanup(info->vgInfo); @@ -1246,6 +1245,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB dbInfo->vgInfo = NULL; SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName)); + if (newAdded) { CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); } else { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 97a0557748..3623f5947d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -186,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pQueryEnd = pDispatcher->queryEnd; } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 56919ff99e..489fff5d64 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -410,7 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level") TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk") // catalog -TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") -- GitLab