提交 21a5e4f9 编写于 作者: D dapan1121

fix hb issue

上级 c04bc182
...@@ -70,13 +70,11 @@ typedef uint16_t tmsg_t; ...@@ -70,13 +70,11 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum { enum {
HEARTBEAT_TYPE_MQ = 0, CONN_TYPE__QUERY = 1,
HEARTBEAT_TYPE_QUERY, CONN_TYPE__TMQ,
// types can be added here CONN_TYPE__MAX
// };
HEARTBEAT_TYPE_MAX
} EHbType;
enum { enum {
HEARTBEAT_KEY_DBINFO = 1, HEARTBEAT_KEY_DBINFO = 1,
...@@ -1649,7 +1647,7 @@ typedef struct { ...@@ -1649,7 +1647,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t tscRid; int64_t tscRid;
int32_t hbType; int8_t connType;
} SClientHbKey; } SClientHbKey;
typedef struct { typedef struct {
...@@ -1796,13 +1794,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { ...@@ -1796,13 +1794,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1; if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1; if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0; return 0;
} }
......
...@@ -45,11 +45,6 @@ extern "C" { ...@@ -45,11 +45,6 @@ extern "C" {
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
enum {
CONN_TYPE__QUERY = 1,
CONN_TYPE__TMQ,
};
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
typedef struct { typedef struct {
...@@ -84,8 +79,8 @@ typedef struct { ...@@ -84,8 +79,8 @@ typedef struct {
TdThread thread; TdThread thread;
TdThreadMutex lock; // used when app init and cleanup TdThreadMutex lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; FHbRspHandle rspHandle[CONN_TYPE__MAX];
} SClientHbMgr; } SClientHbMgr;
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
...@@ -307,7 +302,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); ...@@ -307,7 +302,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void); void appHbMgrCleanup(void);
// conn level // conn level
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType); int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
......
...@@ -122,7 +122,7 @@ void closeAllRequests(SHashObj *pRequests) { ...@@ -122,7 +122,7 @@ void closeAllRequests(SHashObj *pRequests) {
void destroyTscObj(void *pObj) { void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj; STscObj *pTscObj = pObj;
SClientHbKey connKey = {.tscRid = pTscObj->id, .hbType = pTscObj->connType}; SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests); closeAllRequests(pTscObj->pRequests);
......
...@@ -110,7 +110,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -110,7 +110,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) { if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.hbType); tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -224,7 +224,7 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) ...@@ -224,7 +224,7 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
for (int32_t i = 0; i < rspNum; ++i) { for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp);
if (code) { if (code) {
break; break;
} }
...@@ -420,11 +420,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -420,11 +420,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
} }
void hbMgrInitMqHbHandle() { void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle;
clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
} }
static FORCE_INLINE void hbMgrInitHandle() { static FORCE_INLINE void hbMgrInitHandle() {
...@@ -458,7 +458,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -458,7 +458,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) { if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
if (code) { if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
continue; continue;
...@@ -692,22 +692,22 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -692,22 +692,22 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
return 0; return 0;
} }
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
SClientHbKey connKey = { SClientHbKey connKey = {
.tscRid = tscRefId, .tscRid = tscRefId,
.hbType = hbType, .connType = connType,
}; };
SHbConnInfo info = {0}; SHbConnInfo info = {0};
switch (hbType) { switch (connType) {
case HEARTBEAT_TYPE_QUERY: { case CONN_TYPE__QUERY: {
int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t)); int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
*pClusterId = clusterId; *pClusterId = clusterId;
info.param = pClusterId; info.param = pClusterId;
return hbRegisterConnImpl(pAppHbMgr, connKey, &info); return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
} }
case HEARTBEAT_TYPE_MQ: { case CONN_TYPE__TMQ: {
return 0; return 0;
} }
default: default:
......
...@@ -3424,5 +3424,4 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -3424,5 +3424,4 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast); taosMemoryFreeClear(pReq->ast)
} \ No newline at end of file
...@@ -343,7 +343,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -343,7 +343,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1; return -1;
...@@ -451,9 +451,9 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { ...@@ -451,9 +451,9 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
int32_t sz = taosArrayGetSize(batchReq.reqs); int32_t sz = taosArrayGetSize(batchReq.reqs);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp); mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp);
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) { if (pRsp != NULL) {
taosArrayPush(batchRsp.rsps, pRsp); taosArrayPush(batchRsp.rsps, pRsp);
......
...@@ -99,7 +99,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { ...@@ -99,7 +99,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchReq batchReq = {0}; SClientHbBatchReq batchReq = {0};
batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq)); batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq));
SClientHbReq req = {0}; SClientHbReq req = {0};
req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; req.connKey = {.connId = 123, .hbType = CONN_TYPE__TMQ};
req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
SKv kv = {0}; SKv kv = {0};
kv.key = 123; kv.key = 123;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册