提交 5b7855eb 编写于 作者: H Haojun Liao

other:merge 3.0

上级 335cb651
......@@ -4434,144 +4434,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator;
int32_t readerIdx;
int64_t uid;
SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlockTemp(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
blockDataCleanup(pBlock);
int64_t st = taosGetTimestampUs();
void* pStart = taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
STsdbReader* reader = pInfo->pReader;
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(reader, &binfo);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.type = binfo.type;
pBlock->info.uid = binfo.uid;
pBlock->info.window = binfo.window;
pBlock->info.rows = binfo.rows;
if (tsdbIsAscendingOrder(pInfo->pReader)) {
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
} else {
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
}
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
pBlock->info.groupId = *groupId;
}
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
tsdbReaderClose(pInfo->pReader);
pInfo->pReader = NULL;
return pBlock;
}
tsdbReaderClose(pInfo->pReader);
pInfo->pReader = NULL;
return NULL;
}
static SSDataBlock* getTableDataBlock2(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
int64_t uid = source->uid;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
int64_t st = taosGetTimestampUs();
blockDataCleanup(pBlock);
STsdbReader* reader = pTableScanInfo->pReader;
while (tsdbTableNextDataBlock(reader, uid)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(reader, &binfo);
blockDataEnsureCapacity(pBlock, binfo.rows);
pBlock->info.type = binfo.type;
pBlock->info.uid = binfo.uid;
pBlock->info.window = binfo.window;
pBlock->info.rows = binfo.rows;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
pBlock->info.groupId = *groupId;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
}
return NULL;
}
static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册