提交 684db8eb 编写于 作者: H Haojun Liao

refactor(query): refactor some variable name and macro definitions.

上级 7873c74e
......@@ -110,7 +110,6 @@ typedef struct SFileBlockInfo {
#define FUNCTION_COV 38
typedef struct SResultRowEntryInfo {
// int8_t hasResult:6; // result generated, not NULL value
bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed
uint8_t isNullRes:6; // the result is null
......@@ -119,10 +118,10 @@ typedef struct SResultRowEntryInfo {
// determine the real data need to calculated the result
enum {
BLK_DATA_NO_NEEDED = 0x0,
BLK_DATA_STATIS_NEEDED = 0x1,
BLK_DATA_ALL_NEEDED = 0x3,
BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter
BLK_DATA_NOT_LOAD = 0x0,
BLK_DATA_SMA_LOAD = 0x1,
BLK_DATA_DATA_LOAD = 0x3,
BLK_DATA_FILTEROUT = 0x4, // discard current data block since it is not qualified for filter
};
enum {
......
......@@ -46,6 +46,14 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
......
......@@ -2004,7 +2004,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
bool hasFirstLastFunc = false;
bool hasOtherFunc = false;
if (status == BLK_DATA_ALL_NEEDED || status == BLK_DATA_DISCARD) {
if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
return status;
}
......@@ -2023,11 +2023,11 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
}
}
if (hasFirstLastFunc && status == BLK_DATA_NO_NEEDED) {
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
if (!hasOtherFunc) {
return BLK_DATA_DISCARD;
return BLK_DATA_FILTEROUT;
} else {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
}
}
......@@ -2360,7 +2360,7 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVarian
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NO_NEEDED;
uint32_t status = BLK_DATA_NOT_LOAD;
int32_t numOfOutput = pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -2369,11 +2369,11 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
// group by + first/last should not apply the first/last block filter
if (functionId < 0) {
status |= BLK_DATA_ALL_NEEDED;
status |= BLK_DATA_DATA_LOAD;
return status;
} else {
// status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
// if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
// if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
// return status;
// }
}
......@@ -2384,7 +2384,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
*status = BLK_DATA_NOT_LOAD;
pBlock->pDataBlock = NULL;
pBlock->pBlockAgg = NULL;
......@@ -2397,36 +2397,15 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
#if 0
if (pRuntimeEnv->pTsBuf != NULL) {
(*status) = BLK_DATA_ALL_NEEDED;
if (pQueryAttr->stableQuery) { // todo refactor
SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0];
int16_t tagId = (int16_t)pExprInfo->base.param[0].i;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagId);
// compare tag first
SVariant t = {0};
doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes);
setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current);
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (taosVariantCompare(&t, elem.tag) != 0))) {
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
}
}
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED;
(*status) = BLK_DATA_DATA_LOAD;
}
// check if this data block is required to load
if ((*status) != BLK_DATA_ALL_NEEDED) {
if ((*status) != BLK_DATA_DATA_LOAD) {
bool needFilter = true;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
......@@ -2458,18 +2437,18 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
if (needFilter) {
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
} else {
(*status) = BLK_DATA_ALL_NEEDED;
(*status) = BLK_DATA_DATA_LOAD;
}
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
// *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) {
if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
// pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->discardBlocks += 1;
} else if ((*status) == BLK_DATA_STATIS_NEEDED) {
} else if ((*status) == BLK_DATA_SMA_LOAD) {
// this function never returns error?
pCost->loadBlockStatis += 1;
// tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
......@@ -2479,7 +2458,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
pCost->totalCheckedRows += pBlock->info.rows;
}
} else {
assert((*status) == BLK_DATA_ALL_NEEDED);
assert((*status) == BLK_DATA_DATA_LOAD);
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
......@@ -2511,7 +2490,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
pCost->discardBlocks += 1;
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
(*status) = BLK_DATA_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
}
......@@ -2523,7 +2502,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
// pCost->discardBlocks += 1;
// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
// pBlockInfo->window.ekey, pBlockInfo->rows);
// (*status) = BLK_DATA_DISCARD;
// (*status) = BLK_DATA_FILTEROUT;
// return TSDB_CODE_SUCCESS;
// }
......
......@@ -73,7 +73,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
*status = BLK_DATA_ALL_NEEDED;
*status = BLK_DATA_DATA_LOAD;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (pCols == NULL) {
......@@ -138,7 +138,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
// }
// this function never returns error?
uint32_t status = BLK_DATA_ALL_NEEDED;
uint32_t status = BLK_DATA_DATA_LOAD;
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
......@@ -146,7 +146,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
}
// current block is ignored according to filter result by block statistics data, continue load the next block
if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) {
if (status == BLK_DATA_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
......
......@@ -557,14 +557,14 @@ static void count_func_merge(SqlFunctionCtx *pCtx) {
*/
int32_t countRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
} else {
return BLK_DATA_STATIS_NEEDED;
return BLK_DATA_SMA_LOAD;
}
}
int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
#define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \
do { \
......@@ -743,76 +743,76 @@ static void sum_func_merge(SqlFunctionCtx *pCtx) {
}
static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_STATIS_NEEDED;
return BLK_DATA_SMA_LOAD;
}
static int32_t dataBlockRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
}
// todo: if column in current data block are null, opt for this case
static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
// no result for first query, data block is required
if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
} else {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
}
static int32_t lastFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
} else {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
}
static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
// not initialized yet, it is the first block, load it.
if (pCtx->pOutput == NULL) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
}
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
} else { // data in current block is not earlier than current result
return (pInfo->ts <= w->skey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
return (pInfo->ts <= w->skey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD;
}
}
static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i) {
return BLK_DATA_NO_NEEDED;
return BLK_DATA_NOT_LOAD;
}
// not initialized yet, it is the first block, load it.
if (pCtx->pOutput == NULL) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
}
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
return BLK_DATA_DATA_LOAD;
} else {
return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
return (pInfo->ts > w->ekey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册