提交 86bfad2b 编写于 作者: H Haojun Liao

[TD-225] refactor

上级 8249d5f6
......@@ -253,6 +253,8 @@ typedef struct SQueryRuntimeEnv {
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
struct STableScanInfo* pi;
} SQueryRuntimeEnv;
enum {
......@@ -312,6 +314,7 @@ typedef struct SSDataBlock {
} SSDataBlock;
typedef struct STableScanInfo {
SQInfo* pQInfo;
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
......@@ -321,6 +324,9 @@ typedef struct STableScanInfo {
int32_t order; // scan order
int32_t times; // repeat counts
int32_t current;
int32_t reverseTimes; // 0 by default
SSDataBlock block;
int64_t elapsedTime;
......
......@@ -172,6 +172,10 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery);
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime);
static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime);
static int32_t getNumOfScanTimes(SQuery* pQuery);
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
......@@ -3461,6 +3465,45 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
}
}
static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) {
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->pTsBuf) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
}
// reverse order time range
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (QUERY_IS_ASC_QUERY(pQuery)) {
assert(pQuery->window.skey <= pQuery->window.ekey);
} else {
assert(pQuery->window.skey >= pQuery->window.ekey);
}
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pQInfo);
setupQueryRangeForReverseScan(pQInfo);
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
}
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
......@@ -4643,6 +4686,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
if (needReverseScan(pQuery)) {
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery), 1);
} else {
pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery));
}
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
......@@ -5586,59 +5635,128 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
}
static SSDataBlock* doTableScan(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*) param;
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv;
SSDataBlock* pBlock = &pTableScanInfo->block;
while(tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
SSDataBlock *pBlock = &pTableScanInfo->block;
while (pTableScanInfo->current < pTableScanInfo->times) {
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
// todo check for query cancel
// todo check for query cancel
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
SDataStatis *pStatis = pBlock->pBlockStatis;
SDataStatis *pStatis = pBlock->pBlockStatis;
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfBlockStatis += 1;
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfBlockStatis += 1;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
pTableScanInfo->numOfRows += pBlock->info.rows;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
pTableScanInfo->numOfRows += pBlock->info.rows;
}
return pBlock;
}
return pBlock;
}
if (++pTableScanInfo->current >= pTableScanInfo->times) {
return NULL;
}
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef);
if (pTableScanInfo->pQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
pRuntimeEnv->resultRowInfo.curIndex = qstatus.windowIndex;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN;
pRuntimeEnv->resultRowInfo.curIndex = 0;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN;
if (pRuntimeEnv->pTsBuf) {
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
if (pRuntimeEnv->pTsBuf) {
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
}
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
}
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo,
cond.twindow.skey, cond.twindow.ekey);
if (pTableScanInfo->reverseTimes > 0) {
setEnvBeforeReverseScan_rv(pRuntimeEnv);
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window);
pTableScanInfo->pQueryHandle =
tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef);
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
// todo check for query cancel
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
SDataStatis *pStatis = pBlock->pBlockStatis;
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfBlockStatis += 1;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
pTableScanInfo->numOfRows += pBlock->info.rows;
}
return pBlock;
}
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
}
return NULL;
}
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, int32_t repeatTime) {
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->apply = doTableScan;
pInfo->times = repeatTime;
pInfo->reverseTimes = 0;
pInfo->current = 0;
pInfo->pQInfo = pQInfo;
return pInfo;
}
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->apply = doTableScan;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->current = 0;
pInfo->pQInfo = pQInfo;
return pInfo;
}
static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->current;
}
// this is a blocking operator
static SSDataBlock* doAggOperator(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
......@@ -5662,12 +5780,19 @@ static SSDataBlock* doAggOperator(void* param) {
pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData;
}
pQuery->pos = 0;
int32_t countId = 0;
while(1) {
SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo);
if (pBlock == NULL) {
break;
}
if (countId != getTableScanTime(pInfo->pTableScanInfo)) {
needRepeatScan(pInfo->pRuntimeEnv);
}
blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock);
}
......@@ -5678,6 +5803,18 @@ static SSDataBlock* doAggOperator(void* param) {
return res;
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQuery* pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_STDDEV || functionId == TSDB_FUNC_PERCT) {
return 2;
}
}
return 1;
}
static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......@@ -5705,20 +5842,15 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
return;
}
STableScanInfo* pi = createTableScanInfo(pRuntimeEnv->pQueryHandle);
SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pi);
SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo);
// scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
// since the numOfRows must be identical for all functions that are allowed to be executed simutaneously.
// pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv);
doSecondaryArithmeticProcess(pQuery);
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// doSecondaryArithmeticProcess(pQuery);
// TODO limit/offset refactor to be one operator
skipResults(pRuntimeEnv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册