提交 f8007803 编写于 作者: wmmhello's avatar wmmhello

fix:error in TD-23218 & remove useless logic

上级 f94b1df5
...@@ -198,6 +198,8 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); ...@@ -198,6 +198,8 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
......
...@@ -486,9 +486,9 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe ...@@ -486,9 +486,9 @@ static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollRe
} }
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64, ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version); taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) { if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
......
...@@ -157,6 +157,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -157,6 +157,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
} }
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (qStreamExtractOffsetUid(task) != 0) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1); pHandle->snapshotVer + 1);
break; break;
......
...@@ -993,6 +993,11 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { ...@@ -993,6 +993,11 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
return &pTaskInfo->streamInfo.metaRsp; return &pTaskInfo->streamInfo.metaRsp;
} }
int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.currentOffset.uid;
}
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal)); memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册