提交 50ee0dab 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 d48a5632
......@@ -3290,9 +3290,31 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
return TSDB_CODE_SUCCESS;
} else {
return initForFirstBlockInFile(pReader, pBlockIter);
}
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
const char* idstr) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
pCond->twindows.ekey -= 1;
}
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
......@@ -3300,21 +3322,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// check for query time window
STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(&pReader->window)) {
if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
// update the SQueryTableDataCond to create inner reader
STimeWindow w = pCond->twindows;
int32_t order = pCond->order;
if (order == TSDB_ORDER_ASC) {
pCond->twindows.ekey = pCond->twindows.skey;
pCond->twindows.ekey = window.skey;
pCond->twindows.skey = INT64_MIN;
pCond->order = TSDB_ORDER_DESC;
} else {
pCond->twindows.skey = pCond->twindows.ekey;
pCond->twindows.skey = window.ekey;
pCond->twindows.ekey = INT64_MAX;
pCond->order = TSDB_ORDER_ASC;
}
......@@ -3326,11 +3347,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
if (order == TSDB_ORDER_ASC) {
pCond->twindows.skey = w.ekey;
pCond->twindows.skey = window.ekey;
pCond->twindows.ekey = INT64_MAX;
} else {
pCond->twindows.skey = INT64_MIN;
pCond->twindows.ekey = w.ekey;
pCond->twindows.ekey = window.ekey;
}
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
if (code != TSDB_CODE_SUCCESS) {
......@@ -3340,13 +3361,13 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
}
} else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
}
......@@ -3368,40 +3389,36 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1];
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr);
// we need only one row
pPrevReader->capacity = 1;
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
return code;
}
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
code = doOpenReaderImpl(pNextReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in files, let's try buffer in memory
if (pPrevReader->status.fileIter.numOfFiles == 0) {
pPrevReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pPrevReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
......@@ -3418,6 +3435,19 @@ void tsdbReaderClose(STsdbReader* pReader) {
return;
}
{
if (pReader->innerReader[0] != NULL) {
pReader->innerReader[0]->status.pTableMap = NULL;
pReader->innerReader[0]->pReadSnap = NULL;
pReader->innerReader[1]->status.pTableMap = NULL;
pReader->innerReader[1]->pReadSnap = NULL;
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
}
}
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
taosMemoryFreeClear(pSupInfo->plist);
......@@ -3508,32 +3538,31 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
if (pReader->innerReader[0] != NULL) {
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
if (ret) {
pReader->step = EXTERNAL_ROWS_PREV;
return ret;
}
}
tsdbReaderClose(pReader->innerReader[0]);
pReader->innerReader[0] = NULL;
if (pReader->step == EXTERNAL_ROWS_PREV) {
pReader->step = EXTERNAL_ROWS_MAIN;
}
pReader->step = EXTERNAL_ROWS_MAIN;
bool ret = doTsdbNextDataBlock(pReader);
if (ret) {
return ret;
}
if (pReader->innerReader[1] != NULL) {
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
if (ret1) {
pReader->step = EXTERNAL_ROWS_NEXT;
return ret1;
}
tsdbReaderClose(pReader->innerReader[1]);
pReader->innerReader[1] = NULL;
}
return false;
......
......@@ -2213,15 +2213,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
......@@ -2534,6 +2525,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
pOperator->name = "TimeSliceOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册