提交 506c192b 编写于 作者: wmmhello's avatar wmmhello

fix:error in TD-23218 & remove useless logic

上级 f8007803
......@@ -198,8 +198,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
......
......@@ -156,35 +156,32 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
}
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (qStreamExtractOffsetUid(task) != 0) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
break;
}
// get meta
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (tmp->metaRspLen > 0) {
qStreamExtractOffset(task, &tmp->rspOffset);
*pMetaRsp = *tmp;
if (pRsp->blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
tqDebug("tmqsnap task get data");
break;
}
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
*pOffset = tmp->rspOffset;
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("tmqsnap task exec change to get data");
if (pDataBlock == NULL) {
qStreamExtractOffset(task, pOffset);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
continue;
}
*pMetaRsp = *tmp;
tqDebug("tmqsnap task exec exited, get meta");
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
qStreamExtractOffset(task, &pRsp->rspOffset);
break;
}
if (pRsp->blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
qStreamExtractOffset(task, &pRsp->rspOffset);
break;
}
}
return 0;
}
......
......@@ -993,11 +993,6 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
return &pTaskInfo->streamInfo.metaRsp;
}
int64_t qStreamExtractOffsetUid(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.currentOffset.uid;
}
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
......
......@@ -2087,17 +2087,16 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
STqOffsetVal offset = {0};
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->sContext->snapVersion);
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
} else {
STqOffsetVal offset = {0};
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
}
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
tDeleteSSchemaWrapper(mtInfo.schema);
qDebug("tmqsnap stream scan tsdb return null");
return NULL;
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext;
......@@ -2112,10 +2111,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
if (!sContext->queryMeta) { // change to get data next poll request
tqOffsetResetToData(&pTaskInfo->streamInfo.metaRsp.rspOffset, 0, INT64_MIN);
STqOffsetVal offset = {0};
tqOffsetResetToData(&offset, 0, INT64_MIN);
qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
} else {
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset;
pTaskInfo->streamInfo.metaRsp.resMsgType = type;
pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册