提交 1fc66469 编写于 作者: wmmhello's avatar wmmhello

fix:continue consume if wal is dropped

上级 c4d1b5aa
......@@ -388,7 +388,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
......@@ -403,7 +403,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
......@@ -412,11 +412,11 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
}
} else {
pReader->nextBlk += 1;
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
}
qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
......@@ -604,7 +604,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
}
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
SSDataBlock* pBlock = pReader->pResBlock;
......
......@@ -1078,6 +1078,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* pOperator = pTaskInfo->pRoot;
const char* id = GET_TASKID(pTaskInfo);
if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
}
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册