提交 1e845aca 编写于 作者: H Haojun Liao

fix(tmq): allow seek before get assignment

上级 ccbea130
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam); typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
struct SMqMgmt { struct SMqMgmt {
...@@ -2626,12 +2628,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2626,12 +2628,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->currentOffset.type; int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG) { if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
......
...@@ -1280,6 +1280,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1280,6 +1280,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
return 0; return 0;
} }
......
...@@ -268,12 +268,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -268,12 +268,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (*pTaskId == taskId) { if (*pTaskId == taskId) {
...@@ -283,6 +285,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -283,6 +285,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册