提交 96481174 编写于 作者: wmmhello's avatar wmmhello

fix:can not do assignment if in tsdb mode

上级 8904f385
......@@ -771,6 +771,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -2576,6 +2576,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
// in case of snapshot is opened, no valid offset will return
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if ((pClientVg->offsetInfo.currentOffset.type < TMQ_OFFSET__LOG && tmq->useSnapshot) ||
pClientVg->offsetInfo.currentOffset.type > TMQ_OFFSET__LOG) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, pClientVg->offsetInfo.currentOffset.type);
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
goto end;
}
}
*assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
if (*assignment == NULL) {
......@@ -2783,7 +2792,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
}
if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
......
......@@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册