提交 15dd2721 编写于 作者: L Liu Jicong

enh(tmq): do not show closed consumer

上级 3b56457f
...@@ -1493,7 +1493,7 @@ typedef struct { ...@@ -1493,7 +1493,7 @@ typedef struct {
} SMVSubscribeRsp; } SMVSubscribeRsp;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;
} SMDropTopicReq; } SMDropTopicReq;
......
...@@ -1307,15 +1307,19 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { ...@@ -1307,15 +1307,19 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
} }
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_t* lst = tmq_list_new(); if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); tmq_list_t* lst = tmq_list_new();
tmq_list_destroy(lst); tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) { tmq_list_destroy(lst);
// TODO: free resources if (rsp == TMQ_RESP_ERR__SUCCESS) {
return TMQ_RESP_ERR__SUCCESS; // TODO: free resources
} else { return TMQ_RESP_ERR__SUCCESS;
return TMQ_RESP_ERR__FAIL; } else {
return TMQ_RESP_ERR__FAIL;
}
} }
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} }
const char* tmq_err2str(tmq_resp_err_t err) { const char* tmq_err2str(tmq_resp_err_t err) {
......
...@@ -29,6 +29,7 @@ enum { ...@@ -29,6 +29,7 @@ enum {
MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__LOST_IN_REB, MQ_CONSUMER_STATUS__LOST_IN_REB,
MQ_CONSUMER_STATUS__LOST_REBD, MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
}; };
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
......
...@@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
} }
} }
if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto SUBSCRIBE_OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER; if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
...@@ -684,9 +692,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -684,9 +692,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY; pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
// TODO: remove
/*if (taosArrayGetSize(pOldConsumer->assignedTopics) == 0) {*/
/*}*/
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
......
...@@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) {
} }
} }
err = tmq_consumer_close(pInfo->tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows; pInfo->consumeRowCnt = totalRows;
...@@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) { ...@@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) {
return NULL; return NULL;
} }
err = tmq_consumer_close(pInfo->tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
pInfo->tmq = NULL;
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册