提交 d57b3f38 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 405e201c
......@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYEXECUTOR_H
#define TDENGINE_QUERYEXECUTOR_H
#ifndef TDENGINE_QEXECUTOR_H
#define TDENGINE_QEXECUTOR_H
#include "os.h"
......@@ -45,21 +45,16 @@ enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
/* result output buffer is full, current query is paused.
* this status is only exist in group-by clause and diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL = 0x2u,
/* query is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY_COMPLETED = 0x4u,
QUERY_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY_OVER = 0x8u,
QUERY_OVER = 0x4u,
};
typedef struct SResultRowPool {
......@@ -197,7 +192,6 @@ typedef struct SQuery {
int32_t interBufSize; // intermediate buffer sizse
SOrderVal order;
int16_t numOfCols;
int16_t numOfTags;
......@@ -242,7 +236,6 @@ typedef struct SQueryRuntimeEnv {
uint32_t status; // query status
void* qinfo;
uint16_t scanFlag; // denotes reversed scan of data or not
// SFillInfo* pFillInfo; // todo move to operatorInfo
void* pQueryHandle;
int32_t prevGroupId; // previous executed group id
......@@ -353,8 +346,6 @@ typedef struct SQueryParam {
} SQueryParam;
typedef struct STableScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
......@@ -372,7 +363,6 @@ typedef struct STableScanInfo {
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
bool externalLoaded; // external rows loaded
int32_t numOfOutput;
int64_t elapsedTime;
......@@ -445,7 +435,6 @@ bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQuery* pQuery);
void tableQueryImpl(SQInfo *pQInfo);
bool isValidQInfo(void *param);
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
......@@ -457,4 +446,4 @@ void freeQInfo(SQInfo *pQInfo);
int32_t getMaximumIdleDurationSec();
#endif // TDENGINE_QUERYEXECUTOR_H
#endif // TDENGINE_QEXECUTOR_H
......@@ -49,6 +49,11 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2,
};
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
typedef struct {
int32_t status; // query status
TSKEY lastKey; // the lastKey value before query executed
......@@ -135,8 +140,8 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
}
tw->ekey -= 1;
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
......@@ -180,6 +185,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
......@@ -187,9 +193,8 @@ static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock*
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static bool isPointInterpoQuery(SQuery *pQuery);
// setup the output buffer for each operator
......@@ -234,14 +239,13 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
int64_t maxOutput = 0;
for (int32_t j = 0; j < numOfOutput; ++j) {
int32_t functionId = pCtx[j].functionId;
int32_t id = pCtx[j].functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (hasMainFunction &&
(functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ)) {
continue;
}
......@@ -255,14 +259,14 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3
return maxOutput;
}
static void setNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t j = 0; j < numOfOutput; ++j) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
pResInfo->numOfRes = 0;
}
}
bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
static bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false;
}
......@@ -282,7 +286,7 @@ bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
return false;
}
bool isStabledev(SQuery* pQuery) {
static bool isStabledev(SQuery* pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pExpr1[i].base.functionId;
if (functId == TSDB_FUNC_STDDEV_DST) {
......@@ -293,30 +297,6 @@ bool isStabledev(SQuery* pQuery) {
return false;
}
int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
assert(pGroupbyExpr != NULL);
int32_t colId = -2;
int16_t type = TSDB_DATA_TYPE_NULL;
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
colId = pColIndex->colId;
break;
}
}
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (colId == pQuery->colList[i].colId) {
type = pQuery->colList[i].type;
break;
}
}
return type;
}
static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
bool hasTags = false;
int32_t numOfSelectivity = 0;
......@@ -619,16 +599,6 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot]->closed;
}
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
......@@ -4073,10 +4043,13 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
}
}
static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
static SSDataBlock* doTableScanImpl(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = &pTableScanInfo->block;
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo;
SQuery* pQuery = pOperator->pRuntimeEnv->pQuery;
STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
......@@ -4097,9 +4070,9 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
// this function never returns error?
uint32_t status;
int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, pTableScanInfo, pBlock, &status);
int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTableScanInfo->pRuntimeEnv->env, code);
longjmp(pOperator->pRuntimeEnv->env, code);
}
// current block is ignored according to filter result by block statistics data, continue load the next block
......@@ -4117,13 +4090,13 @@ static SSDataBlock* doTableScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
while (pTableScanInfo->current < pTableScanInfo->times) {
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
......@@ -4176,7 +4149,7 @@ static SSDataBlock* doTableScan(void* param) {
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey;
}
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
......@@ -4185,32 +4158,6 @@ static SSDataBlock* doTableScan(void* param) {
return NULL;
}
static SSDataBlock* doSeqTableBlocksScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
STableScanInfo *pTableScanInfo = pOperator->info;
// SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv;
// int32_t totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
// while (1) {
return doTableScanImpl(pTableScanInfo);
// SSDataBlock* p = doTableScanImpl(pTableScanInfo);
// if (p != NULL) {
// return p;
// }
// try the next table
// if (++pTableScanInfo->tableIndex >= totalTables) {
// return NULL;
// }
//
// setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex);
// pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle;
// pTableScanInfo->externalLoaded = false;
// }
}
static SSDataBlock* doBlockInfoScan(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*)param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -4220,7 +4167,7 @@ static SSDataBlock* doBlockInfoScan(void* param) {
STableScanInfo *pTableScanInfo = pOperator->info;
STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables = pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
tableBlockDist.numOfTables = pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist);
......@@ -4258,7 +4205,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator";
......@@ -4266,6 +4212,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doTableScan;
return pOperator;
......@@ -4279,7 +4226,6 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator";
......@@ -4288,7 +4234,8 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doSeqTableBlocksScan;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doTableScanImpl;
return pOperator;
}
......@@ -4297,7 +4244,6 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
SColumnInfoData infoData = {{0}};
......@@ -4372,11 +4318,11 @@ static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQuery
pInfo->reverseTimes = reverseTime;
pInfo->current = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
pOptr->exec = doTableScan;
......@@ -4519,12 +4465,12 @@ static SSDataBlock* doArithmeticOperation(void* param) {
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pInfo->pRes->info.rows >= 4096) {
if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break;
}
}
setNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
......@@ -5242,24 +5188,6 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
return pOperator;
}
void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery;
// number of points returned during this query
int64_t st = taosGetTimestampUs();
// assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
// SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
// pQuery->current = taosArrayGetP(g, 0);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
// record the total elapsed time
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
int32_t j = 0;
......@@ -6166,7 +6094,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册