提交 09b86cbf 编写于 作者: D dapan1121

fix bugs

上级 03a366fa
......@@ -76,9 +76,10 @@ typedef enum {
HEARTBEAT_TYPE_MAX
} EHbType;
typedef enum {
enum {
HEARTBEAT_KEY_DBINFO = 1,
HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_MQ_TMP,
};
......
......@@ -99,6 +99,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
/**
* Get a table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
......
......@@ -35,10 +35,6 @@ extern "C" {
typedef struct SAppInstInfo SAppInstInfo;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef struct SHbConnInfo {
void *param;
SClientHbReq *req;
......@@ -60,6 +56,12 @@ typedef struct SAppHbMgr {
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey *connKey, void* param, SClientHbReq *req);
typedef struct SClientHbMgr {
int8_t inited;
// ctl
......@@ -223,11 +225,11 @@ void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo);
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......
......@@ -15,6 +15,8 @@
#include "clientInt.h"
#include "trpc.h"
#include "catalog.h"
#include "clientLog.h"
static SClientHbMgr clientHbMgr = {0};
......@@ -25,6 +27,67 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp)
return 0;
}
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
while (msgLen < valueLen) {
SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen);
rsp->vgVersion = ntohl(rsp->vgVersion);
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo;
strcpy(dbInfo.dbName, rsp->db);
dbInfo.dbId = rsp->uid;
dbInfo.vgVersion = rsp->vgVersion;
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
} else {
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgInfo) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < rsp->vgNum; ++i) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].numOfEps; ++n) {
rsp->vgroupInfo[i].epAddr[n].port = ntohs(rsp->vgroupInfo[i].epAddr[n].port);
}
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgInfo);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgInfo);
}
}
if (code) {
return code;
}
msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo);
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
......@@ -36,9 +99,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO:
case HEARTBEAT_KEY_DBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
break;
}
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_STBINFO:
break;
......@@ -56,8 +134,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
return -1;
}
char *key = (char *)param;
SClientHbBatchRsp* pRsp = (SClientHbBatchRsp*) pMsg->pData;
int32_t reqNum = taosArrayGetSize(pRsp->rsps);
SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp);
int32_t reqNum = taosArrayGetSize(pRsp.rsps);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) {
......@@ -66,7 +146,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
}
for (int32_t i = 0; i < reqNum; ++i) {
SClientHbRsp* rsp = taosArrayGet(pRsp->rsps, i);
SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
if (code) {
break;
......@@ -145,8 +225,9 @@ void hbFreeReqKvHash(SHashObj* info) {
}
}
void hbFreeReq(SClientHbReq *req) {
hbFreeReqKvHash(req->info);
void hbFreeReq(void *req) {
SClientHbReq *pReq = (SClientHbReq *)req;
hbFreeReqKvHash(pReq->info);
}
......@@ -305,9 +386,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
}
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->getInfoFuncs == NULL) {
if (pAppHbMgr->connInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(pAppHbMgr);
return NULL;
......@@ -325,7 +406,7 @@ void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
taosHashCleanup(pTarget->getInfoFuncs);
taosHashCleanup(pTarget->connInfo);
}
}
......@@ -357,23 +438,6 @@ void hbMgrCleanUp() {
taosArrayDestroy(clientHbMgr.appHbMgrs);
}
int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
int64_t reqId = hbRsp->reqId;
int64_t rspId = hbRsp->rspId;
SArray* rsps = hbRsp->rsps;
int32_t sz = taosArrayGetSize(rsps);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(rsps, i);
if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
} else {
// discard rsp
}
}
return 0;
}
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
// init hash in activeinfo
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
......@@ -421,7 +485,7 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
......
......@@ -434,13 +434,8 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
return NULL;
}
SKv kv = {0};
kv.key = malloc(256);
if (kv.key == NULL) {
taosArrayDestroy(pArray);
return NULL;
}
strcpy(kv.key, "mq-tmp");
kv.keyLen = strlen("mq-tmp") + 1;
kv.key = HEARTBEAT_KEY_MQ_TMP;
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (pMqHb == NULL) {
return pArray;
......
......@@ -114,7 +114,7 @@ void *tDeserializeSClientHbReq(void *buf, SClientHbReq *pReq) {
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, sizeof(kv.key), kv.value, kv.valueLen);
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen);
}
return buf;
......
......@@ -26,6 +26,7 @@ int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen);
#ifdef __cplusplus
}
......
......@@ -346,8 +346,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0};
batchRsp.key = batchReq.key;
batchRsp.keyLen = batchReq.keyLen;
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
......@@ -365,17 +363,18 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SKv* kv = pIter;
switch (kv->key) {
case HEARTBEAT_KEY_DBINFO:
case HEARTBEAT_KEY_DBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateDBInfo(pMnode, (SDbVgVersion *)kv->value, kv->valueLen/sizeof(SDbVgVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp->info, &kv);
taosArrayPush(hbRsp.info, &kv);
taosArrayPush(batchRsp.rsps, &hbRsp);
}
break;
}
case HEARTBEAT_KEY_STBINFO:
break;
......@@ -412,7 +411,6 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
taosArrayDestroy(rsp->info);
}
tfree(batchRsp.key);
taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
......
......@@ -102,11 +102,10 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ};
req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
SKv kv;
kv.key = (void*)"abc";
kv.keyLen = 4;
kv.key = 123;
kv.value = (void*)"bcd";
kv.valueLen = 4;
taosHashPut(req.info, kv.key, kv.keyLen, kv.value, kv.valueLen);
taosHashPut(req.info, &kv.key, sizeof(kv.key), kv.value, kv.valueLen);
taosArrayPush(batchReq.reqs, &req);
int32_t tlen = tSerializeSClientHbBatchReq(NULL, &batchReq);
......
......@@ -1236,6 +1236,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
}
bool newAdded = false;
dbInfo->lock = 0;
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
......@@ -1269,7 +1271,7 @@ int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
bool removed = false;
if (NULL == pCatalog || NULL == dbInfo) {
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCatalog->dbCache.cache) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册