提交 bb58ec1e 编写于 作者: L Liu Jicong

feat(tmq): support background heartbeat

上级 280ed4f6
...@@ -75,13 +75,18 @@ typedef uint16_t tmsg_t; ...@@ -75,13 +75,18 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX }; enum {
CONN_TYPE__QUERY = 1,
CONN_TYPE__TMQ,
CONN_TYPE__UDFD,
CONN_TYPE__MAX,
};
enum { enum {
HEARTBEAT_KEY_USER_AUTHINFO = 1, HEARTBEAT_KEY_USER_AUTHINFO = 1,
HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_DBINFO,
HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_MQ_TMP, HEARTBEAT_KEY_TMQ,
}; };
typedef enum _mgmt_table { typedef enum _mgmt_table {
...@@ -2145,6 +2150,15 @@ typedef struct { ...@@ -2145,6 +2150,15 @@ typedef struct {
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
} SMqAskEpReq; } SMqAskEpReq;
typedef struct {
int64_t consumerId;
int32_t epoch;
} SMqHbReq;
typedef struct {
int8_t reserved;
} SMqHbRsp;
typedef struct { typedef struct {
int32_t key; int32_t key;
int32_t valueLen; int32_t valueLen;
...@@ -2333,29 +2347,30 @@ static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey ...@@ -2333,29 +2347,30 @@ static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey
return 0; return 0;
} }
typedef struct SMqHbVgInfo { typedef struct {
int32_t vgId; int32_t vgId;
} SMqHbVgInfo; // TODO stas
} SMqReportVgInfo;
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqReportVgInfo* pVgInfo) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
return tlen; return tlen;
} }
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqReportVgInfo* pVgInfo) {
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
return buf; return buf;
} }
typedef struct SMqHbTopicInfo { typedef struct {
int32_t epoch; int32_t epoch;
int64_t topicUid; int64_t topicUid;
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
SArray* pVgInfo; // SArray<SMqHbVgInfo> SArray* pVgInfo; // SArray<SMqHbVgInfo>
} SMqHbTopicInfo; } SMqTopicInfo;
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { static FORCE_INLINE int32_t taosEncodeSMqTopicInfoMsg(void** buf, const SMqTopicInfo* pTopicInfo) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
...@@ -2363,35 +2378,35 @@ static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbT ...@@ -2363,35 +2378,35 @@ static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbT
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); SMqReportVgInfo* pVgInfo = (SMqReportVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
tlen += taosEncodeSMqVgInfo(buf, pVgInfo); tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
} }
return tlen; return tlen;
} }
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTopicInfo) {
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
buf = taosDecodeStringTo(buf, pTopicInfo->name); buf = taosDecodeStringTo(buf, pTopicInfo->name);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo vgInfo; SMqReportVgInfo vgInfo;
buf = taosDecodeSMqVgInfo(buf, &vgInfo); buf = taosDecodeSMqVgInfo(buf, &vgInfo);
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
} }
return buf; return buf;
} }
typedef struct SMqHbMsg { typedef struct {
int32_t status; // ask hb endpoint int32_t status; // ask hb endpoint
int32_t epoch; int32_t epoch;
int64_t consumerId; int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo> SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg; } SMqReportReq;
static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { static FORCE_INLINE int32_t taosEncodeSMqReportMsg(void** buf, const SMqReportReq* pMsg) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->status); tlen += taosEncodeFixedI32(buf, pMsg->status);
tlen += taosEncodeFixedI32(buf, pMsg->epoch); tlen += taosEncodeFixedI32(buf, pMsg->epoch);
...@@ -2399,22 +2414,22 @@ static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { ...@@ -2399,22 +2414,22 @@ static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
int32_t sz = taosArrayGetSize(pMsg->pTopics); int32_t sz = taosArrayGetSize(pMsg->pTopics);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); SMqTopicInfo* topicInfo = (SMqTopicInfo*)taosArrayGet(pMsg->pTopics, i);
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); tlen += taosEncodeSMqTopicInfoMsg(buf, topicInfo);
} }
return tlen; return tlen;
} }
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg) {
buf = taosDecodeFixedI32(buf, &pMsg->status); buf = taosDecodeFixedI32(buf, &pMsg->status);
buf = taosDecodeFixedI32(buf, &pMsg->epoch); buf = taosDecodeFixedI32(buf, &pMsg->epoch);
buf = taosDecodeFixedI64(buf, &pMsg->consumerId); buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqHbTopicInfo topicInfo; SMqTopicInfo topicInfo;
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo);
taosArrayPush(pMsg->pTopics, &topicInfo); taosArrayPush(pMsg->pTopics, &topicInfo);
} }
return buf; return buf;
...@@ -2919,89 +2934,6 @@ typedef struct { ...@@ -2919,89 +2934,6 @@ typedef struct {
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
#if 0
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*>
SArray* blockSchema; // SArray<SSchemaWrapper>
} SMqDataBlkRsp;
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->blockNum);
if (pRsp->blockNum != 0) {
tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
void* data = taosArrayGetP(pRsp->blockData, i);
tlen += taosEncodeFixedI32(buf, bLen);
tlen += taosEncodeBinary(buf, data, bLen);
if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW);
}
if (pRsp->withTbName) {
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
tlen += taosEncodeString(buf, tbName);
}
}
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0;
void* data = NULL;
buf = taosDecodeFixedI32(buf, &bLen);
buf = taosDecodeBinary(buf, &data, bLen);
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
}
}
}
return (void*)buf;
}
#endif
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
......
...@@ -144,6 +144,7 @@ enum { ...@@ -144,6 +144,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
......
...@@ -24,9 +24,15 @@ static SClientHbMgr clientHbMgr = {0}; ...@@ -24,9 +24,15 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
//
return 0;
}
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
//
return 0;
}
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0; int32_t code = 0;
...@@ -158,12 +164,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -158,12 +164,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
} else { } else {
if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) { if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet *pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp *pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse]; SEp *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port); pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
} }
...@@ -392,7 +398,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -392,7 +398,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
if (code) { if (code) {
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
...@@ -442,7 +447,6 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S ...@@ -442,7 +447,6 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0; uint32_t dbNum = 0;
...@@ -521,7 +525,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -521,7 +525,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
} }
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL != pApp) { if (NULL != pApp) {
memcpy(&req->app, pApp, sizeof(*pApp)); memcpy(&req->app, pApp, sizeof(*pApp));
} else { } else {
...@@ -534,7 +538,6 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { ...@@ -534,7 +538,6 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param; int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
...@@ -567,7 +570,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -567,7 +570,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void hbMgrInitMqHbHandle() { static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle; clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
...@@ -575,11 +579,6 @@ void hbMgrInitMqHbHandle() { ...@@ -575,11 +579,6 @@ void hbMgrInitMqHbHandle() {
clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle; clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
} }
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbHandle();
}
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) { if (pBatchReq == NULL) {
...@@ -602,7 +601,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -602,7 +601,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
continue; continue;
} }
//hbClearClientHbReq(pOneReq); // hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
} }
...@@ -615,11 +614,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -615,11 +614,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq; return pBatchReq;
} }
void hbThreadFuncUnexpectedStopped(void) { void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
atomic_store_8(&clientHbMgr.threadStop, 2);
}
void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) { void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
dst->numOfInsertsReq += src->numOfInsertsReq; dst->numOfInsertsReq += src->numOfInsertsReq;
dst->numOfInsertRows += src->numOfInsertRows; dst->numOfInsertRows += src->numOfInsertRows;
dst->insertElapsedTime += src->insertElapsedTime; dst->insertElapsedTime += src->insertElapsedTime;
...@@ -645,7 +642,7 @@ int32_t hbGatherAppInfo(void) { ...@@ -645,7 +642,7 @@ int32_t hbGatherAppInfo(void) {
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) { if (NULL == pApp) {
memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
req.startTime = pAppHbMgr->startTime; req.startTime = pAppHbMgr->startTime;
...@@ -662,7 +659,6 @@ int32_t hbGatherAppInfo(void) { ...@@ -662,7 +659,6 @@ int32_t hbGatherAppInfo(void) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void *hbThreadFunc(void *param) { static void *hbThreadFunc(void *param) {
setThreadName("hb"); setThreadName("hb");
#ifdef WINDOWS #ifdef WINDOWS
...@@ -698,7 +694,6 @@ static void *hbThreadFunc(void *param) { ...@@ -698,7 +694,6 @@ static void *hbThreadFunc(void *param) {
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
break; break;
} }
...@@ -708,7 +703,6 @@ static void *hbThreadFunc(void *param) { ...@@ -708,7 +703,6 @@ static void *hbThreadFunc(void *param) {
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf); taosMemoryFree(buf);
break; break;
} }
...@@ -725,7 +719,6 @@ static void *hbThreadFunc(void *param) { ...@@ -725,7 +719,6 @@ static void *hbThreadFunc(void *param) {
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
} }
...@@ -881,7 +874,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust ...@@ -881,7 +874,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust
SClientHbReq hbReq = {0}; SClientHbReq hbReq = {0};
hbReq.connKey = connKey; hbReq.connKey = connKey;
hbReq.clusterId = clusterId; hbReq.clusterId = clusterId;
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
...@@ -896,12 +889,9 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in ...@@ -896,12 +889,9 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
}; };
switch (connType) { switch (connType) {
case CONN_TYPE__QUERY: { case CONN_TYPE__QUERY:
case CONN_TYPE__TMQ:
return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId); return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
}
case CONN_TYPE__TMQ: {
return 0;
}
default: default:
return 0; return 0;
} }
......
此差异已折叠。
...@@ -196,6 +196,7 @@ SArray *mmGetMsgHandles() { ...@@ -196,6 +196,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -48,6 +48,7 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); ...@@ -48,6 +48,7 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg); static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
...@@ -62,6 +63,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -62,6 +63,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
...@@ -255,6 +257,33 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -255,6 +257,33 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont;
int64_t consumerId = be64toh(pReq->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
atomic_store_32(&pConsumer->hbStatus, 0);
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER;
pRpcMsg->pCont = pRecoverMsg;
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
}
mndReleaseConsumer(pMnode, pConsumer);
return 0;
}
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont; SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
...@@ -262,19 +291,22 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -262,19 +291,22 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch); int32_t epoch = ntohl(pReq->epoch);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->info.node, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; return -1;
} }
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
/*int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);*/
#if 1
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
#endif
// 1. check consumer status // 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
#if 0
if (status == MQ_CONSUMER_STATUS__LOST_REBD) { if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
...@@ -285,6 +317,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -285,6 +317,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
} }
#endif
if (status != MQ_CONSUMER_STATUS__READY) { if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h" #include "mndProfile.h"
#include "mndPrivilege.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndQnode.h" #include "mndQnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
...@@ -387,67 +387,7 @@ static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) { ...@@ -387,67 +387,7 @@ static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
} }
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
#if 0 //
SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRsp->connKey = pReq->connKey;
SMqHbBatchRsp batchRsp;
batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
if (batchRsp.batchRsps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SClientHbKey connKey = pReq->connKey;
SHashObj* pObj = pReq->info;
SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
if (pKv == NULL) {
taosMemoryFree(pRsp);
return NULL;
}
SMqHbMsg mqHb;
taosDecodeSMqMsg(pKv->value, &mqHb);
/*int64_t clientUid = htonl(pKv->value);*/
/*if (mqHb.epoch )*/
int sz = taosArrayGetSize(mqHb.pTopics);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId);
for (int i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp innerBatchRsp;
innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
if (innerBatchRsp.rsps == NULL) {
//TODO
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
if (pConsumerTopic->epoch != topicInfo->epoch) {
//add new vgids into rsp
int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
for (int j = 0; j < vgSz; j++) {
SMqHbRsp innerRsp;
SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
taosArrayPush(innerBatchRsp.rsps, &innerRsp);
}
}
taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
}
int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
//TODO
return NULL;
}
void* abuf = buf;
taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
#endif
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册