tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset,pInfo->tqReader->pWalReader->curVersion-1);//curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog(
&pTaskInfo->streamInfo.currentOffset,
pInfo->tqReader->pWalReader->curVersion-1);// curVersion move to next, so currentOffset = curVersion - 1
if(ret.fetchType==FETCH_TYPE__DATA){
if(ret.fetchType==FETCH_TYPE__DATA){
qDebug("doQueueScan get data from log %"PRId64" rows, version:%"PRId64,ret.data.info.rows,pTaskInfo->streamInfo.currentOffset.version);
qDebug("doQueueScan get data from log %"PRId64" rows, version:%"PRId64,ret.data.info.rows,
pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo,&ret.data,true);
setBlockIntoRes(pInfo,&ret.data,true);
if(pInfo->pRes->info.rows>0){
if(pInfo->pRes->info.rows>0){
qDebug("doQueueScan get data from log %"PRId64" rows, return, version:%"PRId64,pInfo->pRes->info.rows,pTaskInfo->streamInfo.currentOffset.version);
qDebug("doQueueScan get data from log %"PRId64" rows, return, version:%"PRId64,pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
returnpInfo->pRes;
returnpInfo->pRes;
}
}
}elseif(ret.fetchType==FETCH_TYPE__NONE){
}elseif(ret.fetchType==FETCH_TYPE__NONE){
qDebug("doQueueScan get none from log, return, version:%"PRId64,pTaskInfo->streamInfo.currentOffset.version);
qDebug("doQueueScan get none from log, return, version:%"PRId64,pTaskInfo->streamInfo.currentOffset.version);