提交 78e49d6b 编写于 作者: D dapan1121

feature/qnode

上级 3aa92dec
......@@ -1391,9 +1391,26 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
while (pIter != NULL) {
SKv* kv = (SKv*)pIter;
tfree(kv->value);
pIter = taosHashIterate(info, pIter);
}
}
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
if (req->info) taosHashCleanup(req->info);
if (req->info) {
tFreeReqKvHash(req->info);
taosHashCleanup(req->info);
}
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
......@@ -1409,6 +1426,25 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
free(pReq);
}
static FORCE_INLINE void tFreeClientKv(void *pKv) {
SKv *kv = (SKv *)pKv;
if (kv) {
tfree(kv->value);
}
}
static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}
static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp;
taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
}
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
......
......@@ -37,6 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
rsp->vgVersion = ntohl(rsp->vgVersion);
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo;
......@@ -96,6 +98,9 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
}
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
tscDebug("hb got %d rsp kv", kvNum);
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) {
......@@ -130,28 +135,42 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
}
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (code != 0) {
tfree(param);
return -1;
}
char *key = (char *)param;
SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp);
int32_t reqNum = taosArrayGetSize(pRsp.rsps);
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) {
tscError("cluster not exist, key:%s", key);
tscError("cluster not exist, key:%s", key);
tfree(param);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
for (int32_t i = 0; i < reqNum; ++i) {
tfree(param);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else {
atomic_add_fetch_32(&emptyRspNum, 1);
}
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
if (code) {
break;
}
}
tFreeClientHbBatchRsp(&pRsp);
return code;
}
......@@ -166,6 +185,10 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
return code;
}
if (dbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < dbNum; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = htobe64(db->dbId);
......@@ -173,6 +196,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
}
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
......@@ -213,21 +239,9 @@ static FORCE_INLINE void hbMgrInitHandle() {
hbMgrInitMqHbHandle();
}
void hbFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
while (pIter != NULL) {
SKv* kv = pIter;
tfree(kv->value);
pIter = taosHashIterate(info, pIter);
}
}
void hbFreeReq(void *req) {
SClientHbReq *pReq = (SClientHbReq *)req;
hbFreeReqKvHash(pReq->info);
tFreeReqKvHash(pReq->info);
}
......@@ -274,7 +288,7 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
while (pIter != NULL) {
SClientHbReq* pOneReq = pIter;
hbFreeReqKvHash(pOneReq->info);
tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
......@@ -333,7 +347,8 @@ static void* hbThreadFunc(void* param) {
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false);
tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
......
......@@ -18,6 +18,7 @@
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "catalog.h"
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
......@@ -287,7 +288,6 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo: Remove cache in catalog cache.
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
......@@ -295,6 +295,18 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SDropDbReq *req = pRequest->body.requestMsg.pData;
SDbVgVersion dbVer = {0};
struct SCatalog *pCatalog = NULL;
strncpy(dbVer.dbName, req->db, sizeof(dbVer.dbName));
dbVer.dbId = 0; //TODO GET DBID FROM RSP
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDBVgroup(pCatalog, &dbVer);
tsem_post(&pRequest->body.rspSem);
return code;
}
......
......@@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen);
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
}
return buf;
......
......@@ -894,6 +894,8 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
for (int32_t i = 0; i < num; ++i) {
SDbVgVersion *db = &dbs[i];
db->dbId = be64toh(db->dbId);
db->vgVersion = ntohl(db->vgVersion);
len = 0;
......@@ -929,6 +931,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
pRsp->vgNum = htonl(vgNum);
pRsp->hashMethod = pDb->hashMethod;
} else {
pRsp->uid = htobe64(db->dbId);
pRsp->vgNum = htonl(0);
pRsp->hashMethod = 0;
pRsp->vgVersion = htonl(-1);
}
......
......@@ -370,8 +370,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
taosArrayPush(batchRsp.rsps, &hbRsp);
}
break;
}
......@@ -380,11 +378,14 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
break;
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
taosArrayPush(batchRsp.rsps, &hbRsp);
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) {
......
......@@ -31,7 +31,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_RENT_SLOT_SECOND 2
#define CTG_RENT_SLOT_SECOND 1.5
#define CTG_DEFAULT_INVALID_VERSION (-1)
......
......@@ -443,9 +443,9 @@ int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
}
int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
return -1;
} else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
} else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
return 1;
} else {
return 0;
......@@ -652,7 +652,7 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
if (NULL == output->tbMeta) {
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -809,20 +809,19 @@ int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* targ
}
CTG_LOCK(CTG_WRITE, &info->lock);
//TODO OPEN IT
#if 0
if (info->dbId != target->dbId) {
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
return TSDB_CODE_SUCCESS;
}
if (info->vgVersion > target->vgVersion) {
ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
return TSDB_CODE_SUCCESS;
}
#else
target->dbId = info->dbId;
#endif
if (info->vgInfo) {
ctgInfo("cleanup db vgInfo, db:%s", target->dbName);
taosHashCleanup(info->vgInfo);
......@@ -1246,6 +1245,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
dbInfo->vgInfo = NULL;
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName));
if (newAdded) {
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
} else {
......
......@@ -186,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pQueryEnd = pDispatcher->queryEnd;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
......
......@@ -410,7 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
// catalog
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册