tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset,pInfo->tqReader->pWalReader->curVersion-1);//curVersion move to next, so currentOffset = curVersion - 1
if(tqNextBlock(pInfo->tqReader,&ret)<0){
// if the end is reached, terrno is 0
if(terrno!=0){
qError("failed to get next log block since %s, %s",terrstr(),id);
}
}
if(ret.fetchType==FETCH_TYPE__DATA){
if(ret.fetchType==FETCH_TYPE__DATA){
qDebug("doQueueScan get data from log %"PRId64" rows, version:%"PRId64,ret.data.info.rows,pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo,&ret.data,true);
setBlockIntoRes(pInfo,&ret.data,true);
if(pInfo->pRes->info.rows>0){
if(pInfo->pRes->info.rows>0){
pOperator->status=OP_EXEC_RECV;
qDebug("doQueueScan get data from log %"PRId64" rows, return, version:%"PRId64,pInfo->pRes->info.rows,pTaskInfo->streamInfo.currentOffset.version);