提交 74887574 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 61cb98c9
...@@ -37,9 +37,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -37,9 +37,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows)
...@@ -241,10 +238,10 @@ struct SOperatorInfo; ...@@ -241,10 +238,10 @@ struct SOperatorInfo;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env; jmp_buf env;
SQuery* pQuery; SQuery* pQuery;
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; // todo move to operatorInfo // SFillInfo* pFillInfo; // todo move to operatorInfo
void* pQueryHandle; void* pQueryHandle;
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
...@@ -262,14 +259,13 @@ typedef struct SQueryRuntimeEnv { ...@@ -262,14 +259,13 @@ typedef struct SQueryRuntimeEnv {
SArithmeticSupport *sasArray; SArithmeticSupport *sasArray;
SSDataBlock *outputBuf; SSDataBlock *outputBuf;
int32_t tableIndex; //TODO remove it
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot; struct SOperatorInfo *proot;
struct SOperatorInfo *pTableScanner; // table scan operator struct SOperatorInfo *pTableScanner; // table scan operator
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
...@@ -278,15 +274,31 @@ enum { ...@@ -278,15 +274,31 @@ enum {
OP_EXEC_DONE = 3, OP_EXEC_DONE = 3,
}; };
enum OPERATOR_TYPE_E {
OP_TableScan = 1,
OP_DataBlocksOptScan = 2,
OP_TableSeqScan = 3,
OP_TagScan = 4,
OP_Aggregate = 5,
OP_Arithmetic = 6,
OP_Groupby = 7,
OP_Limit = 8,
OP_Offset = 9,
OP_TimeInterval = 10,
OP_Fill = 11,
OP_MultiTableAggregate = 12,
OP_MultiTableTimeInterval = 13,
};
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char *name; // name, used to show the query execution plan char *name; // name, used to show the query execution plan
void *info; // extension attribution void *info; // extension attribution
SExprInfo *pExpr; SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo *upstream; struct SOperatorInfo *upstream;
__operator_fn_t exec; __operator_fn_t exec;
...@@ -402,6 +414,7 @@ typedef struct SOffsetOperatorInfo { ...@@ -402,6 +414,7 @@ typedef struct SOffsetOperatorInfo {
} SOffsetOperatorInfo; } SOffsetOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
SSDataBlock *pRes; SSDataBlock *pRes;
int64_t totalInputRows; int64_t totalInputRows;
} SFillOperatorInfo; } SFillOperatorInfo;
......
...@@ -80,8 +80,6 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo); ...@@ -80,8 +80,6 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput);
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput); void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
......
...@@ -27,8 +27,6 @@ ...@@ -27,8 +27,6 @@
#include "tlosertree.h" #include "tlosertree.h"
#include "ttype.h" #include "ttype.h"
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1)
/** /**
* check if the primary column is load by default, otherwise, the program will * check if the primary column is load by default, otherwise, the program will
* forced to load primary column explicitly. * forced to load primary column explicitly.
...@@ -150,7 +148,6 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { ...@@ -150,7 +148,6 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
} }
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
//static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset); int32_t numOfCols, int32_t* rowCellInfoOffset);
...@@ -168,31 +165,31 @@ static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); ...@@ -168,31 +165,31 @@ static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery); static STableIdInfo createTableIdInfo(SQuery* pQuery);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(SQuery* pQuery); static int32_t getNumOfScanTimes(SQuery* pQuery);
static bool isFixedOutputQuery(SQuery* pQuery); static bool isFixedOutputQuery(SQuery* pQuery);
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
...@@ -1723,10 +1720,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1723,10 +1720,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// interval (down sampling operation) // interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) { if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
} else { } else {
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) { if (pQuery->pExpr2 != NULL) {
...@@ -1740,7 +1737,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1740,7 +1737,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
} else if (pQuery->groupbyColumn) { } else if (pQuery->groupbyColumn) {
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) { if (pQuery->pExpr2 != NULL) {
...@@ -1748,7 +1745,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1748,7 +1745,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
} else if (isFixedOutputQuery(pQuery)) { } else if (isFixedOutputQuery(pQuery)) {
if (pQuery->stableQuery && !isTsCompQuery(pQuery)) { if (pQuery->stableQuery && !isTsCompQuery(pQuery)) {
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
} else { } else {
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
} }
...@@ -1811,8 +1808,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1811,8 +1808,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->sasArray);
} }
pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv); doFreeQueryHandle(pRuntimeEnv);
...@@ -2936,7 +2931,7 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold ...@@ -2936,7 +2931,7 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
cleanupGroupResInfo(pGroupResInfo); cleanupGroupResInfo(pGroupResInfo);
if (!incNextGroup(pGroupResInfo)) { if (!incNextGroup(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pRuntimeEnv); break;
} }
} }
...@@ -3408,20 +3403,6 @@ bool requireTimestamp(SQuery *pQuery) { ...@@ -3408,20 +3403,6 @@ bool requireTimestamp(SQuery *pQuery) {
return false; return false;
} }
bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
/*
* 1. if skey or ekey locates in this block, we need to load the timestamp column to decide the precise position
* 2. if there are top/bottom, first_dst/last_dst functions, we need to load timestamp column in any cases;
*/
STimeWindow *w = &pDataBlockInfo->window;
STableQueryInfo* pTableQueryInfo = pQuery->current;
bool loadPrimaryTS = (pTableQueryInfo->lastKey >= w->skey && pTableQueryInfo->lastKey <= w->ekey) ||
(pQuery->window.ekey >= w->skey && pQuery->window.ekey <= w->ekey) || requireTimestamp(pQuery);
return loadPrimaryTS;
}
//static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) { //static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) {
// void* qinfo = pRuntimeEnv->qinfo; // void* qinfo = pRuntimeEnv->qinfo;
// SQuery *pQuery = pRuntimeEnv->pQuery; // SQuery *pQuery = pRuntimeEnv->pQuery;
...@@ -3592,48 +3573,44 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv, ...@@ -3592,48 +3573,44 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv,
} }
} }
bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) { //bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; // SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; //// SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
//
if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { // if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) {
return false; // return false;
} // }
//
if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) { // if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) {
return false; // return false;
} // }
//
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { // if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
// There are results not returned to client yet, so filling applied to the remain result is required firstly. // // There are results not returned to client yet, so filling applied to the remain result is required firstly.
if (taosFillHasMoreResults(pFillInfo)) { // if (taosFillHasMoreResults(pFillInfo)) {
return true; // return true;
} // }
//
/* // /*
* While the code reaches here, there are no results remains now. // * While the code reaches here, there are no results remains now.
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block // * If query is not completed yet, the gaps between two results blocks need to be handled after next data block
* is retrieved from TSDB. // * is retrieved from TSDB.
* // *
* NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result // * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result
* set is the FIRST result block, the gap between the start time of query time window and the timestamp of the // * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
* first result row in the actual result set will fill nothing. // * first result row in the actual result set will fill nothing.
*/ // */
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); // int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity);
return numOfTotal > 0; // return numOfTotal > 0;
} else { // there are results waiting for returned to client. // } else { // there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && // if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) &&
(pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { // (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) {
return true; // return true;
} // }
} // }
//
return false; // return false;
} //}
static int16_t getNumOfFinalResCol(SQuery* pQuery) {
return pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2;
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
...@@ -3676,31 +3653,20 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3676,31 +3653,20 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
qDebug("QInfo:%p set %d subscribe info", pQInfo, total); qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
// Check if query is completed or not for stable query or normal table query respectively. // Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) {
if (pQInfo->query.stableQuery) { setQueryStatus(pRuntimeEnv, QUERY_OVER);
if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) {
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
} else {
if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) {
setQueryStatus(pRuntimeEnv, QUERY_OVER);
}
}
} }
} }
int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) { int32_t doFillGapsInResults_rv(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) {
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
p[i] = pColInfoData->pData; p[i] = pColInfoData->pData;
} }
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pRuntimeEnv->resultInfo.capacity); pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity);
tfree(p); tfree(p);
return pOutput->info.rows; return pOutput->info.rows;
} }
...@@ -3787,6 +3753,7 @@ static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) { ...@@ -3787,6 +3753,7 @@ static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
int idx = (int32_t)((len - 1) * rate); int idx = (int32_t)((len - 1) * rate);
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows; return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
} }
static int compareBlockInfo(const void *pLeft, const void *pRight) { static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SDataBlockInfo *)pLeft)->rows; int32_t left = ((SDataBlockInfo *)pLeft)->rows;
int32_t right = ((SDataBlockInfo *)pRight)->rows; int32_t right = ((SDataBlockInfo *)pRight)->rows;
...@@ -4126,18 +4093,16 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) ...@@ -4126,18 +4093,16 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return terrno; return terrno;
} }
static SFillColInfo* createFillColInfo(SQuery* pQuery) { static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, int64_t* fillVal) {
int32_t numOfCols = getNumOfFinalResCol(pQuery);
int32_t offset = 0; int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); SFillColInfo* pFillCol = calloc(numOfOutput, sizeof(SFillColInfo));
if (pFillCol == NULL) { if (pFillCol == NULL) {
return NULL; return NULL;
} }
// TODO refactor for(int32_t i = 0; i < numOfOutput; ++i) {
for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExprInfo = &pExpr[i];
SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pExpr1[i]:&pQuery->pExpr2[i];
pFillCol[i].col.bytes = pExprInfo->bytes; pFillCol[i].col.bytes = pExprInfo->bytes;
pFillCol[i].col.type = (int8_t)pExprInfo->type; pFillCol[i].col.type = (int8_t)pExprInfo->type;
...@@ -4145,7 +4110,7 @@ static SFillColInfo* createFillColInfo(SQuery* pQuery) { ...@@ -4145,7 +4110,7 @@ static SFillColInfo* createFillColInfo(SQuery* pQuery) {
pFillCol[i].tagIndex = -2; pFillCol[i].tagIndex = -2;
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
pFillCol[i].functionId = pExprInfo->base.functionId; pFillCol[i].functionId = pExprInfo->base.functionId;
pFillCol[i].fillVal.i = pQuery->fillVal[i]; pFillCol[i].fillVal.i = fillVal[i];
offset += pExprInfo->bytes; offset += pExprInfo->bytes;
} }
...@@ -4190,9 +4155,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts ...@@ -4190,9 +4155,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->resultInfo.capacity = 4096;
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) { } else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery)); pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery));
} else if (needReverseScan(pQuery)) { } else if (needReverseScan(pQuery)) {
pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else { } else {
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
} }
...@@ -4228,20 +4193,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts ...@@ -4228,20 +4193,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
return code; return code;
} }
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = createFillColInfo(pQuery);
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey);
TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey);
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
int32_t numOfCols = getNumOfFinalResCol(pQuery);
pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfCols,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo, pQInfo);
}
setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4469,38 +4420,37 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -4469,38 +4420,37 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->current = 0; pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqScanTableOp"; pOperator->name = "TableScanOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doTableScan; pOperator->exec = doTableScan;
return pOperator; return pOperator;
} }
SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) { SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = 1; pInfo->times = 1;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->current = 0;
pInfo->current = 0; pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->loadExternalRows = loadExternalRows; pInfo->loadExternalRows = loadExternalRows;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockSeqScan"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -4516,35 +4466,34 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4516,35 +4466,34 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr
pTableScanInfo->numOfOutput = pDownstream->numOfOutput; pTableScanInfo->numOfOutput = pDownstream->numOfOutput;
char* name = pDownstream->name; if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) {
if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
SAggOperatorInfo* pAggInfo = pDownstream->info; SAggOperatorInfo* pAggInfo = pDownstream->info;
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (strcasecmp(name, "HashIntervalAgg") == 0) { } else if (pDownstream->operatorType == OP_TimeInterval) {
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "HashGroupbyAgg") == 0) { } else if (pDownstream->operatorType == OP_Groupby) {
SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) { } else if (pDownstream->operatorType == OP_MultiTableTimeInterval) {
STableIntervalOperatorInfo *pInfo = pDownstream->info; STableIntervalOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
} else if (strcasecmp(name, "ArithmeticOp") == 0) { } else if (pDownstream->operatorType == OP_Arithmetic) {
SArithOperatorInfo *pInfo = pDownstream->info; SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pCtx = pInfo->binfo.pCtx;
...@@ -4555,24 +4504,23 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4555,24 +4504,23 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} }
} }
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->current = 0;
pInfo->current = 0; pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "BidirectionSeqScanTableOp"; pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->blockingOptr = false; pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->info = pInfo; pOptr->blockingOptr = false;
pOptr->exec = doTableScan; pOptr->info = pInfo;
pOptr->exec = doTableScan;
return pOptr; return pOptr;
} }
...@@ -4606,8 +4554,7 @@ static SSDataBlock* doAggregate(void* param) { ...@@ -4606,8 +4554,7 @@ static SSDataBlock* doAggregate(void* param) {
setTagVal_rv(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagVal_rv(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf if (upstream->operatorType == OP_DataBlocksOptScan) {
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->info; STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo); order = getTableScanOrder(pScanInfo);
} }
...@@ -4660,8 +4607,7 @@ static SSDataBlock* doSTableAggregate(void* param) { ...@@ -4660,8 +4607,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// TODO opt perf if (upstream->operatorType == OP_DataBlocksOptScan) {
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->info; STableScanInfo* pScanInfo = upstream->info;
order = getTableScanOrder(pScanInfo); order = getTableScanOrder(pScanInfo);
} }
...@@ -4967,8 +4913,8 @@ static SSDataBlock* doFill(void* param) { ...@@ -4967,8 +4913,8 @@ static SSDataBlock* doFill(void* param) {
SFillOperatorInfo *pInfo = pOperator->info; SFillOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); doFillGapsInResults_rv(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
return pInfo->pRes; return pInfo->pRes;
} }
...@@ -4980,17 +4926,17 @@ static SSDataBlock* doFill(void* param) { ...@@ -4980,17 +4926,17 @@ static SSDataBlock* doFill(void* param) {
return NULL; return NULL;
} }
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey);
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey;
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
} }
doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); doFillGapsInResults_rv(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
} }
...@@ -5037,6 +4983,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5037,6 +4983,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = OP_Aggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5067,6 +5014,7 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5067,6 +5014,7 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
} }
...@@ -5080,7 +5028,7 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5080,7 +5028,7 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
...@@ -5089,7 +5037,8 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -5089,7 +5037,8 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
initResultRowInfo(&pInfo->binfo.resultRowInfo, tableGroup, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableAggregate"; pOperator->name = "MultiTableAggregate";
pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5118,7 +5067,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5118,7 +5067,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp"; pOperator->name = "ArithmeticOperator";
pOperator->operatorType = OP_Arithmetic;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5139,7 +5089,8 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5139,7 +5089,8 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "LimitOp"; pOperator->name = "LimitOperator";
pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
...@@ -5156,9 +5107,10 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -5156,9 +5107,10 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo->offset = pRuntimeEnv->pQuery->limit.offset; pInfo->offset = pRuntimeEnv->pQuery->limit.offset;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "OffsetOp"; pOperator->name = "OffsetOperator";
pOperator->operatorType = OP_Offset;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->exec = doOffset; pOperator->exec = doOffset;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5167,7 +5119,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -5167,7 +5119,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
return pOperator; return pOperator;
} }
SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -5176,22 +5128,22 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe ...@@ -5176,22 +5128,22 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashIntervalAgg"; pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doIntervalAgg; pOperator->exec = doIntervalAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator; return pOperator;
} }
SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
...@@ -5199,9 +5151,10 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S ...@@ -5199,9 +5151,10 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableIntervalAggOp"; pOperator->name = "MultiTableTimeIntervalOperator";
pOperator->operatorType = OP_MultiTableTimeInterval;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
...@@ -5214,7 +5167,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S ...@@ -5214,7 +5167,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
return pOperator; return pOperator;
} }
SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index pInfo->colIndex = -1; // group by column index
...@@ -5223,15 +5176,15 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe ...@@ -5223,15 +5176,15 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashGroupbyAgg"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Groupby;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doHashGroupbyAgg; pOperator->exec = doHashGroupbyAgg;
pOperator->cleanup = destroyGroupbyOperatorInfo; pOperator->cleanup = destroyGroupbyOperatorInfo;
...@@ -5241,13 +5194,29 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe ...@@ -5241,13 +5194,29 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) { int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
{
SQuery* pQuery = pRuntimeEnv->pQuery;
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQuery->fillVal);
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey);
TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey);
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
pInfo->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo, pRuntimeEnv->qinfo);
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "FillOp"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Fill;
pOperator->upstream = upstream; pOperator->upstream = upstream;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
...@@ -5392,6 +5361,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -5392,6 +5361,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTableTagScan"; pOperator->name = "SeqTableTagScan";
pOperator->operatorType = OP_TagScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5927,8 +5897,29 @@ static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable ...@@ -5927,8 +5897,29 @@ static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, static UNUSED_FUNC int32_t ddx(SQueryTableMsg* pQueryMsg, SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, SExprInfo* pExprs,
SColumnInfo* pTagCols) { int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pQueryMsg->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG;
} else {
SColumnInfo* pCol = &pQueryMsg->colList[j];
int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64,
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, superTable);
assert(ret == TSDB_CODE_SUCCESS);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) {
*pExprInfo = NULL; *pExprInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -6001,6 +5992,11 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu ...@@ -6001,6 +5992,11 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
} }
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
if (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs); tfree(pExprs);
...@@ -6014,24 +6010,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu ...@@ -6014,24 +6010,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
assert(isValidDataType(pExprs[i].type)); assert(isValidDataType(pExprs[i].type));
} }
// TODO refactor // ddx(pQueryMsg, pExprMsg, pTagCols, pExprs, numOfOutput, tagLen, isSuperTable);
for (int32_t i = 0; i < numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pQueryMsg->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG;
} else {
SColumnInfo *pCol = &pQueryMsg->colList[j];
int32_t ret =
getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64,
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS);
}
}
}
*pExprInfo = pExprs; *pExprInfo = pExprs;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -6085,25 +6064,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu ...@@ -6085,25 +6064,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
assert(isValidDataType(pExprs[i].type)); assert(isValidDataType(pExprs[i].type));
} }
// // TODO refactor
// for (int32_t i = 0; i < numOfOutput; ++i) {
// pExprs[i].base = *pExprMsg[i];
// int16_t functId = pExprs[i].base.functionId;
//
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
// if (j < 0 || j >= pQueryMsg->numOfCols) {
// return TSDB_CODE_QRY_INVALID_MSG;
// } else {
// SColumnInfo *pCol = &pQueryMsg->colList[j];
// int32_t ret =
// getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64,
// &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable);
// assert(ret == TSDB_CODE_SUCCESS);
// }
// }
// }
*pExprInfo = pExprs; *pExprInfo = pExprs;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -6312,29 +6272,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6312,29 +6272,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
goto _cleanup; goto _cleanup;
} }
// prepare the result buffer
// pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES);
// if (pQuery->sdata == NULL) {
// goto _cleanup;
// }
// for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
// // allocate additional memory for interResults that are usually larger then final results
// // TODO refactor
// int16_t bytes = 0;
// if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) {
// bytes = pExprs[col].bytes;
// } else {
// bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes);
// }
//
// size_t size = (size_t)((pRuntimeEnv->resultInfo.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage));
// pQuery->sdata[col] = (tFilePage *)calloc(1, size);
// if (pQuery->sdata[col] == NULL) {
// goto _cleanup;
// }
// }
if (pQuery->fillType != TSDB_FILL_NONE) { if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
if (pQuery->fillVal == NULL) { if (pQuery->fillVal == NULL) {
......
...@@ -357,6 +357,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 ...@@ -357,6 +357,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0); assert(pFillInfo->rowSize > 0);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
pFillInfo->pData[i] = malloc(pFillInfo->pFillCol[i].col.bytes * pFillInfo->alloc);
}
return pFillInfo; return pFillInfo;
} }
...@@ -381,6 +385,10 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { ...@@ -381,6 +385,10 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->pTags[i].tagVal); tfree(pFillInfo->pTags[i].tagVal);
} }
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
tfree(pFillInfo->pData[i]);
}
tfree(pFillInfo->pTags); tfree(pFillInfo->pTags);
tfree(pFillInfo->pData); tfree(pFillInfo->pData);
...@@ -415,17 +423,19 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) ...@@ -415,17 +423,19 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
} }
} }
// copy the data into source data buffer
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
}
}
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData; // pFillInfo->pData[i] = pColData->pData;
if (pInput->info.rows > pFillInfo->alloc) {
char* t = realloc(pFillInfo->pData[i], pColData->info.bytes * pInput->info.rows);
assert(t != NULL);
pFillInfo->pData[i] = t;
pFillInfo->alloc = pInput->info.rows;
}
memcpy(pFillInfo->pData[i], pColData->pData, pColData->info.bytes * pInput->info.rows);
} }
} }
...@@ -436,11 +446,15 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* ...@@ -436,11 +446,15 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage*
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
const char* data = pInput->data + pCol->col.offset * pInput->num; const char* data = pInput->data + pCol->col.offset * pInput->num;
if (pFillInfo->pData[i] == NULL) { if (pInput->num > pFillInfo->alloc) {
pFillInfo->pData[i] = calloc(4096, pCol->col.bytes); char* t = realloc(pFillInfo->pData[i], pCol->col.bytes * pInput->num);
assert(t != NULL);
pFillInfo->pData[i] = t;
pFillInfo->alloc = pInput->num;
} }
memcpy(pFillInfo->pData[i], data, pCol->col.bytes * pInput->num); memcpy(pFillInfo->pData[i], data, pCol->col.bytes * pInput->num);
// pFillInfo->pData[i] = (char*) data;
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
......
...@@ -572,9 +572,9 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu ...@@ -572,9 +572,9 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
incNextGroup(pGroupResInfo); incNextGroup(pGroupResInfo);
} }
if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) { // if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pRuntimeEnv); // SET_STABLE_QUERY_OVER(pRuntimeEnv);
} // }
int64_t elapsedTime = taosGetTimestampUs() - st; int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo, qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册