提交 d7660484 编写于 作者: L Liu Jicong

fix(tmq): performance issue

上级 18993d50
......@@ -35,6 +35,7 @@ enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__END_RSP,
};
typedef enum EStreamType {
......
......@@ -429,7 +429,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09)
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A)
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C)
#define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D)
// wal
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000)
......
......@@ -983,6 +983,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (code != 0) {
tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
/*pRspWrapper->vgHandle = pVg;*/
/*pRspWrapper->topicHandle = pTopic;*/
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
}
goto CREATE_MSG_FAIL;
}
......@@ -1115,7 +1128,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
return set;
}
#if 0
#if 1
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
......@@ -1503,7 +1516,11 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper == NULL) return NULL;
}
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
......@@ -1564,6 +1581,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
return NULL;
}
if (timeout != -1) {
int64_t endTime = taosGetTimestampMs();
......
......@@ -169,6 +169,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
pTq->pVnode->config.vgId, pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
......
......@@ -572,6 +572,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
//tq
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册