diff --git a/include/util/taoserror.h b/include/util/taoserror.h index f37402c18c0ebe8cbe9cac00f18d9880fe90e286..9c5c6333589f5126973ab4651000e429498f9e50 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -780,6 +780,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) #define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) #define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011) +#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index aba127c36e142dedc0af1b4571f1909a2a877a5a..ae4651bd45cbd81fbe03c663438bf142e6763a45 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -586,10 +586,14 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq if(code != 0){ goto end; } - if (offsetVal->type <= 0 || tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { + if (offsetVal->type <= 0) { code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } + if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)){ + code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; + goto end; + } char offsetBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal); @@ -653,6 +657,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c end: if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){ + if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; pCommitFp(tmq, code, userParam); } } @@ -2412,6 +2417,7 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, code = pInfo->code; } + if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); @@ -2456,6 +2462,7 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i end: if(code != 0 && cb != NULL){ + if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; cb(tmq, code, param); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 83a50f70515accfe9ae642de104ae3f82efa138e..64d757144b838a933f1b949957385660cf76d164 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -642,6 +642,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")