未验证 提交 14f8e606 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10741 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
......@@ -128,7 +128,7 @@ target_include_directories(
set(CMAKE_PROJECT_INCLUDE_BEFORE "${CMAKE_SUPPORT_DIR}/EnableCMP0048.txt.in")
add_subdirectory(zlib)
target_include_directories(
zlib
zlibstatic
PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/zlib
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/zlib
)
......
......@@ -57,7 +57,8 @@ typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t rowSize;
int32_t numOfCols;
int16_t numOfCols;
int16_t hasVarCol;
union {int64_t uid; int64_t blockId;};
} SDataBlockInfo;
......@@ -96,13 +97,15 @@ typedef struct SColumnInfoData {
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.uid;
int32_t numOfCols = pBlock->info.numOfCols;
int16_t numOfCols = pBlock->info.numOfCols;
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI32(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
......@@ -120,7 +123,8 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock)
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
......
......@@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void blockDataClearup(SSDataBlock* pDataBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
......
......@@ -295,19 +295,8 @@ typedef struct SMultiFunctionsDesc {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable);
/**
* If the given name is a valid built-in sql function, the value of true will be returned.
* @param name
* @param len
* @return
*/
int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction);
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
bool qIsAggregateFunction(const char* functionName);
bool qIsSelectivityFunction(const char* functionName);
tExprNode* exprTreeFromBinary(const void* data, size_t size);
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
......
......@@ -93,6 +93,8 @@ typedef struct SWindowLogicNode {
int64_t interval;
int64_t offset;
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
SFillNode* pFill;
} SWindowLogicNode;
......@@ -214,6 +216,8 @@ typedef struct SIntervalPhysiNode {
int64_t interval;
int64_t offset;
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
SFillNode* pFill;
} SIntervalPhysiNode;
......
......@@ -1059,10 +1059,10 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// destroyTupleIndex(index);
}
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
void blockDataClearup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0;
if (hasVarCol) {
if (pDataBlock->info.hasVarCol) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
......@@ -1148,7 +1148,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
......
......@@ -50,7 +50,7 @@ typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
SArray* pRows; // SArray<SResultRowPosition*>
bool ordered;
int32_t position;
} SGroupResInfo;
......@@ -67,10 +67,15 @@ typedef struct SResultRow {
char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowPosition {
int32_t pageId;
int32_t offset;
} SResultRowPosition;
typedef struct SResultRowInfo {
SList* pRows;
SResultRow** pResult; // result list
// int16_t type:8; // data type for hash key
SList *pRows;
SResultRowPosition *pPosition;
SResultRow **pResult; // result list
int32_t size; // number of result set
int32_t capacity; // max capacity
int32_t curPos; // current active result row index of pResult list
......@@ -131,7 +136,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
assert(rowOffset >= 0);
int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
return (char*) page + rowOffset + offset * numOfRows;
}
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......@@ -139,12 +144,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
__filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr);
SResultRowPool* initResultRowPool(size_t size);
SResultRow* getNewResultRow(SResultRowPool* p);
int64_t getResultRowPoolMemSize(SResultRowPool* p);
void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
typedef struct {
SArray* pResult; // SArray<SResPair>
......
......@@ -240,12 +240,12 @@ typedef struct STaskAttr {
SArray* pUdfInfo; // no need to free
} STaskAttr;
typedef int32_t (*__optr_open_fn_t)(void* param);
typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
......@@ -365,28 +365,6 @@ typedef struct SQInfo {
STaskCostInfo summary;
} SQInfo;
typedef struct STaskParam {
char* sql;
char* tagCond;
char* colCond;
char* tbnameCond;
char* prevResult;
SArray* pTableIdList;
SExprBasicInfo** pExpr;
SExprBasicInfo** pSecExpr;
SExprInfo* pExprs;
SExprInfo* pSecExprs;
SFilterInfo* pFilters;
SColIndex* pGroupColIndex;
SColumnInfo* pTagColumnInfo;
SGroupbyExpr* pGroupbyExpr;
int32_t tableScanOperator;
SArray* pOperator;
struct SUdfInfo* pUdfInfo;
} STaskParam;
enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
......@@ -485,13 +463,12 @@ typedef struct SAggSupporter {
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
......@@ -515,23 +492,24 @@ typedef struct SAggOperatorInfo {
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SSDataBlock* existDataBlock;
SSDataBlock *existDataBlock;
int32_t threshold;
bool hasVarCol;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
int64_t total;
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimit limit;
SLimit slimit;
char** prevRow;
SArray* orderColumnList;
bool hasPrev;
......@@ -563,14 +541,15 @@ typedef struct SGroupbyOperatorInfo {
char* prevData; // previous group by value
} SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo {
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
bool reptScan; // next round scan
} SSWindowOperatorInfo;
} SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
......@@ -582,23 +561,6 @@ typedef struct SStateWindowOperatorInfo {
bool reptScan;
} SStateWindowOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj* pSet;
SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold;
int64_t outputCapacity;
int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
bool hasVarCol;
......@@ -630,7 +592,6 @@ typedef struct SOrderOperatorInfo {
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle;
int32_t bufPageSize;
int32_t numOfRowsInRes;
......@@ -642,27 +603,44 @@ typedef struct SOrderOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj* pSet;
SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold;
int64_t outputCapacity;
int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
......@@ -699,16 +677,13 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -115,7 +115,8 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
// struct size + data payload + length for each column
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t);
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
......
......@@ -59,7 +59,8 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->capacity = size;
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pResultRowInfo->pResult == NULL) {
pResultRowInfo->pPosition = calloc(pResultRowInfo->capacity, sizeof(SResultRowPosition));
if (pResultRowInfo->pResult == NULL || pResultRowInfo->pPosition == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -182,22 +183,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return rowSize;
}
SResultRowPool* initResultRowPool(size_t size) {
SResultRowPool* p = calloc(1, sizeof(SResultRowPool));
if (p == NULL) {
return NULL;
}
p->numOfElemPerBlock = 128;
p->elemSize = (int32_t) size;
p->blockSize = p->numOfElemPerBlock * p->elemSize;
p->position.pos = 0;
p->pData = taosArrayInit(8, POINTER_BYTES);
return p;
}
SResultRow* getNewResultRow(SResultRowPool* p) {
if (p == NULL) {
return NULL;
......@@ -221,132 +206,6 @@ SResultRow* getNewResultRow(SResultRowPool* p) {
return ptr;
}
int64_t getResultRowPoolMemSize(SResultRowPool* p) {
if (p == NULL) {
return 0;
}
return taosArrayGetSize(p->pData) * p->blockSize;
}
int32_t getNumOfAllocatedResultRows(SResultRowPool* p) {
return (int32_t) taosArrayGetSize(p->pData) * p->numOfElemPerBlock;
}
int32_t getNumOfUsedResultRows(SResultRowPool* p) {
return getNumOfAllocatedResultRows(p) - p->numOfElemPerBlock + p->position.pos;
}
void* destroyResultRowPool(SResultRowPool* p) {
if (p == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(p->pData);
for(int32_t i = 0; i < size; ++i) {
void** ptr = taosArrayGet(p->pData, i);
tfree(*ptr);
}
taosArrayDestroy(p->pData);
tfree(p);
return NULL;
}
void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) {
uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes);
tbufWriteUint32(bw, numOfGroup);
tbufWriteUint16(bw, tagLen);
for(int32_t i = 0; i < numOfGroup; ++i) {
SInterResult* pOne = taosArrayGet(pRes, i);
if (tagLen > 0) {
tbufWriteBinary(bw, pOne->tags, tagLen);
}
uint32_t numOfCols = (uint32_t) taosArrayGetSize(pOne->pResult);
tbufWriteUint32(bw, numOfCols);
for(int32_t j = 0; j < numOfCols; ++j) {
SStddevInterResult* p = taosArrayGet(pOne->pResult, j);
uint32_t numOfRows = (uint32_t) taosArrayGetSize(p->pResult);
tbufWriteUint16(bw, p->colId);
tbufWriteUint32(bw, numOfRows);
for(int32_t k = 0; k < numOfRows; ++k) {
// SResPair v = *(SResPair*) taosArrayGet(p->pResult, k);
// tbufWriteDouble(bw, v.avg);
// tbufWriteInt64(bw, v.key);
}
}
}
}
SArray* interResFromBinary(const char* data, int32_t len) {
SBufferReader br = tbufInitReader(data, len, false);
uint32_t numOfGroup = tbufReadUint32(&br);
uint16_t tagLen = tbufReadUint16(&br);
char* tag = NULL;
if (tagLen > 0) {
tag = calloc(1, tagLen);
}
SArray* pResult = taosArrayInit(4, sizeof(SInterResult));
for(int32_t i = 0; i < numOfGroup; ++i) {
if (tagLen > 0) {
memset(tag, 0, tagLen);
tbufReadToBinary(&br, tag, tagLen);
}
uint32_t numOfCols = tbufReadUint32(&br);
SArray* p = taosArrayInit(numOfCols, sizeof(SStddevInterResult));
for(int32_t j = 0; j < numOfCols; ++j) {
// int16_t colId = tbufReadUint16(&br);
int32_t numOfRows = tbufReadUint32(&br);
// SStddevInterResult interRes = {.colId = colId, .pResult = taosArrayInit(4, sizeof(struct SResPair)),};
for(int32_t k = 0; k < numOfRows; ++k) {
// SResPair px = {0};
// px.avg = tbufReadDouble(&br);
// px.key = tbufReadInt64(&br);
//
// taosArrayPush(interRes.pResult, &px);
}
// taosArrayPush(p, &interRes);
}
char* p1 = NULL;
if (tagLen > 0) {
p1 = malloc(tagLen);
memcpy(p1, tag, tagLen);
}
SInterResult d = {.pResult = p, .tags = p1,};
taosArrayPush(pResult, &d);
}
tfree(tag);
return pResult;
}
void freeInterResult(void* param) {
SInterResult* pResult = (SInterResult*) param;
tfree(pResult->tags);
int32_t numOfCols = (int32_t) taosArrayGetSize(pResult->pResult);
for(int32_t i = 0; i < numOfCols; ++i) {
SStddevInterResult *p = taosArrayGet(pResult->pResult, i);
taosArrayDestroy(p->pResult);
}
taosArrayDestroy(pResult->pResult);
}
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
assert(pGroupResInfo != NULL);
......@@ -360,7 +219,7 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo)
taosArrayDestroy(pGroupResInfo->pRows);
}
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition));
pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
......
......@@ -26,15 +26,16 @@
#include "tsort.h"
#include "ttime.h"
#include "../../function/inc/taggfunction.h"
#include "executorimpl.h"
#include "function.h"
#include "query.h"
#include "tcompare.h"
#include "tcompression.h"
#include "thash.h"
#include "tsdb.h"
#include "ttypes.h"
#include "query.h"
#include "vnode.h"
#include "tsdb.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
......@@ -211,6 +212,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
......@@ -227,15 +229,14 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
static int32_t operatorDummyOpenFn(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static int32_t operatorDummyOpenFn(SOperatorInfo *pOperator) {
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
......@@ -444,10 +445,12 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pResultRowInfo->pPosition = realloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
pResultRowInfo->pResult = (SResultRow **)t;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc);
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition));
pResultRowInfo->capacity = (int32_t)newCapacity;
}
......@@ -564,7 +567,47 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
return pResultRowInfo->pResult[pResultRowInfo->curPos];
}
static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes,
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SFilePage *pData = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
pageId = getPageId(pi);
if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
}
}
}
if (pData == NULL) {
return NULL;
}
// set the number of rows in current disk page
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num;
pData->num += interBufSize + sizeof(SResultRow);
return pResultRow;
}
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes,
bool masterscan, uint64_t tableGroupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
bool existed = false;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, tableGroupId);
......@@ -608,7 +651,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
SResultRow *pResult = NULL;
if (p1 == NULL) {
pResult = getNewResultRow(pSup->pool);
pResult = getNewResultRow_rv(pResultBuf, tableGroupId, pSup->resultRowSize);
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -623,6 +666,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
}
pResultRowInfo->curPos = pResultRowInfo->size;
pResultRowInfo->pPosition[pResultRowInfo->size] = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
int64_t index = pResultRowInfo->curPos;
......@@ -742,6 +786,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tid, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
......@@ -750,9 +795,10 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, tid, &pageId);
if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
pData->num = sizeof(SFilePage);
}
}
}
......@@ -812,9 +858,9 @@ static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pR
static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_t id, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey);
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
pTaskInfo, true, pAggSup);
if (pResultRow == NULL) {
......@@ -822,19 +868,10 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_
return TSDB_CODE_SUCCESS;
}
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) { // todo intermediate result size
int32_t ret = addNewWindowResultBuf(pResultRow, pBuf, (int32_t) tableGroupId, 0);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
}
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowOutputBufInitCtx_rv(pAggSup->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -990,18 +1027,19 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo *pDataBlockInfo, TSKEY *p
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
// keep it temporarialy
int32_t startOffset = pCtx[k].startRow;
bool hasAgg = pCtx[k].isAggSet;
int32_t startOffset = pCtx[k].input.startRowIndex;
bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
pCtx[k].startRow = pos;
pCtx[k].input.startRowIndex = pos;
pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) {
pCtx[k].ptsList = &tsCol[pos];
pCtx[k].ptsList = tsCol;
}
// not a whole block involved in query processing, statistics data can not be used
......@@ -1011,12 +1049,13 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
}
if (functionNeedToExecute(&pCtx[k])) {
// pCtx[k].fpSet.process(&pCtx[k]);
pCtx[k].fpSet.process(&pCtx[k]);
}
// restore it
pCtx[k].isAggSet = hasAgg;
pCtx[k].startRow = startOffset;
pCtx[k].input.colDataAggIsSet = hasAgg;
pCtx[k].input.startRowIndex = startOffset;
pCtx[k].input.numOfRows = numOfRows;
}
}
......@@ -1236,26 +1275,15 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
}
}
static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static void projectApplyFunctions(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pQueryAttr->window.skey;
// Always set the asc order for merge stage process
if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC;
}
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) {
// load the script and exec
// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
// } else {
// pCtx[k].fpSet.process(&pCtx[k]);
// aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
}
// }
}
}
......@@ -1451,8 +1479,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*) pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
......@@ -1463,7 +1490,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1485,7 +1512,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow w = pRes->win;
ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult,
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1503,7 +1530,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// restore current time window
ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1523,7 +1550,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// null data, failed to allocate more memory buffer
int32_t code = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1544,7 +1571,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
}
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
......@@ -1704,7 +1730,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
tfree(pInfo->prevData);
}
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
......@@ -2041,9 +2067,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI
return pFuncCtx;
}
static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) {
return NULL;
......@@ -2056,7 +2080,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
SExprInfo* pExpr = &pExprInfo[i];
SExprBasicInfo *pFunct = &pExpr->base;
SqlFunctionCtx* pCtx = &pFuncCtx[i];
......@@ -2873,6 +2897,8 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
*status = BLK_DATA_ALL_NEEDED;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pBlock->pDataBlock == NULL) {
return terrno;
......@@ -3289,10 +3315,9 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
}
// TODO fix this bug.
int32_t initResultRow(SResultRow *pResultRow) {
pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
pResultRow->pageId = -1;
pResultRow->offset = -1;
return TSDB_CODE_SUCCESS;
}
......@@ -3348,7 +3373,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
int64_t tid = 0;
int64_t groupId = 0;
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
......@@ -3412,11 +3437,11 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) {
needCopyTs = true;
if (i > 0 && pCtx[i-1].functionId == FUNCTION_TS_DUMMY){
if (i > 0 && pCtx[i-1].functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
src = pColRes->pData;
}
}else if(functionId == FUNCTION_TS_DUMMY) {
} else if(functionId == FUNCTION_TS_DUMMY) {
tsNum++;
}
}
......@@ -3489,48 +3514,44 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt
pTableScanInfo->reverseTimes = 0;
}
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
int32_t numOfOutput = pOperator->numOfOutput;
// if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// // for each group result, call the finalize function for each column
// if (pQueryAttr->groupbyColumn) {
// closeAllResultRows(pResultRowInfo);
// }
//
// for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// SResultRow *buf = pResultRowInfo->pResult[i];
// if (!isResultRowClosed(pResultRowInfo, i)) {
// continue;
// }
//
// setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset);
//
// for (int32_t j = 0; j < numOfOutput; ++j) {
//// pCtx[j].startTs = buf->win.skey;
//// if (pCtx[j].functionId < 0) {
//// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
//// } else {
//// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
//// }
// }
//
//
// /*
// * set the number of output results for group by normal columns, the number of output rows usually is 1 except
// * the top and bottom query
// */
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
// }
//
// } else {
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t j = 0; j < numOfOutput; ++j) {
// if (pCtx[j].functionId < 0) {
// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else {
pCtx[j].fpSet.finalize(&pCtx[j]);
// }
}
// }
}
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf *pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRowPosition* pPos = &pResultRowInfo->pPosition[i];
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
if (!isResultRowClosed(pResultRowInfo, i)) {
continue;
}
for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue;
}
pCtx[j].fpSet.finalize(&pCtx[j]);
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
}
releaseBufPage(pBuf, bufPage);
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
}
}
static bool hasMainOutput(STaskAttr *pQueryAttr) {
......@@ -3625,31 +3646,29 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult,
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getBufPage(pBuf, pResult->pageId);
int32_t offset = 0;
// int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
offset += pCtx[i].resDataInfo.bytes;
// offset += pCtx[i].resDataInfo.bytes;
continue;
}
pCtx[i].pOutput = getPosInResultPage_rv(bufPage, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
// offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId < 0) {
continue;
}
// int32_t functionId = pCtx[i].functionId;
// if (functionId < 0) {
// continue;
// }
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
// }
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
if (!pResInfo->initialized) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
// if (!pResInfo->initialized) {
// aAggs[functionId].init(&pCtx[i], pResInfo);
// }
}
}
......@@ -3663,7 +3682,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SResultRow* pResultRow =
doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup);
doSetResultOutBufByKey_rv(pAggInfo->pResultBuf, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup);
assert (pResultRow != NULL);
/*
......@@ -3904,8 +3923,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
* @param pQInfo
* @param result
*/
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity) {
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
......@@ -3923,13 +3941,19 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
step = -1;
}
int32_t nrows = pBlock->info.rows;
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResultRow* pRow = taosArrayGetP(pGroupResInfo->pRows, i);
SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i);
SFilePage *page = getBufPage(pBuf, pPos->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset);
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
continue;
}
// TODO copy multiple rows?
int32_t numOfRowsToCopy = pRow->numOfRows;
if (numOfResult + numOfRowsToCopy >= rowCapacity) {
break;
......@@ -3937,20 +3961,17 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
pGroupResInfo->index += 1;
SFilePage *page = getBufPage(pBuf, pRow->pageId);
int32_t offset = 0;
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
int32_t bytes = pColInfoData->info.bytes;
char *out = pColInfoData->pData + numOfResult * bytes;
char *in = getPosInResultPage_rv(page, pRow->offset, offset);
memcpy(out, in, bytes * numOfRowsToCopy);
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
offset += bytes;
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0);
}
releaseBufPage(pBuf, page);
nrows += 1;
numOfResult += numOfRowsToCopy;
if (numOfResult == rowCapacity) { // output buffer is full
break;
......@@ -3962,16 +3983,16 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
return 0;
}
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity) {
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
pBlock->info.rows = 0;
blockDataClearup(pBlock);
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
return;
}
int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity);
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock);
......@@ -4681,9 +4702,7 @@ static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) {
}
}
static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static SSDataBlock* doTableScanImpl(SOperatorInfo *pOperator, bool* newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
......@@ -4713,7 +4732,7 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
// }
// this function never returns error?
uint32_t status;
uint32_t status = BLK_DATA_ALL_NEEDED;
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4731,9 +4750,7 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
return NULL;
}
static SSDataBlock* doTableScan(void* param, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doTableScan(SOperatorInfo *pOperator, bool *newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
......@@ -4804,8 +4821,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
return p;
}
static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*)param;
static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -4852,9 +4868,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
#endif
}
static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) {
// NOTE: this operator never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
......@@ -5170,8 +5184,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
}
}
static int32_t prepareLoadRemoteData(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static int32_t prepareLoadRemoteData(SOperatorInfo *pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
......@@ -5190,15 +5203,12 @@ static int32_t prepareLoadRemoteData(void* param) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
static SSDataBlock* doLoadRemoteData(SOperatorInfo *pOperator, bool* newgroup) {
SExchangeInfo *pExchangeInfo = pOperator->info;
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
int32_t code = pOperator->_openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -5248,7 +5258,6 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
// TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -5259,11 +5268,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
if (pInfo->pSources == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
goto _error;
}
for(int32_t i = 0; i < numOfSources; ++i) {
......@@ -5271,16 +5278,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
taosArrayPush(pInfo->pSources, pNode);
}
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pInfo);
tfree(pOperator);
taosArrayDestroy(pInfo->pSources);
taosArrayDestroy(pInfo->pSourceDataInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
int32_t code = initDataSource(numOfSources, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -5331,12 +5328,12 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, 0);
destroyExchangeOperatorInfo(pInfo, numOfSources);
}
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -5375,7 +5372,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
......@@ -5497,9 +5494,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
tsem_post(&pSourceDataInfo->pEx->ready);
}
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables.
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
......@@ -5638,7 +5634,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info;
SSessionAggOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
......@@ -5730,8 +5726,8 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
return pOrderColumns;
}
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput);
static void clearupAggSup(SAggSupporter* pAggSup);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey);
static void cleanupAggSup(SAggSupporter* pAggSup);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param;
......@@ -5743,7 +5739,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
}
blockDataDestroy(pInfo->binfo.pRes);
clearupAggSup(&pInfo->aggSup);
cleanupAggSup(&pInfo->aggSup);
}
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -5798,7 +5794,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
blockDataClearup(pDataBlock, hasVarCol);
blockDataClearup(pDataBlock);
while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
......@@ -5954,7 +5950,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
while(1) {
blockDataClearup(pDataBlock, pInfo->hasVarCol);
blockDataClearup(pDataBlock);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
......@@ -5989,8 +5985,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
}
static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6024,7 +6019,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
return doMerge(pOperator);
}
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
static SArray* createBlockOrder(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal) {
SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
size_t numOfOrder = taosArrayGetSize(pOrderVal);
......@@ -6033,8 +6028,8 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
SOrder* pOrder = taosArrayGet(pOrderVal, j);
orderInfo.order = pOrder->order;
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGet(pExprInfo, i);
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pExprInfo[i];
if (pExpr->base.resSchema.colId == pOrder->col.colId) {
orderInfo.colIndex = i;
break;
......@@ -6047,7 +6042,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
return pOrderInfo;
}
static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) {
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) {
if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
return 0;
}
......@@ -6063,8 +6058,8 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO
size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
for(int32_t i = 0; i < numOfGroupCol; ++i) {
SColumn* pCol = taosArrayGet(pGroupInfo, i);
for(int32_t j = 0; j < taosArrayGetSize(pExprInfo); ++j) {
SExprInfo* pe = taosArrayGet(pExprInfo, j);
for(int32_t j = 0; j < numOfCols; ++j) {
SExprInfo* pe = &pExprInfo[j];
if (pe->base.resSchema.colId == pCol->colId) {
taosArrayPush(plist, pCol);
taosArrayPush(pInfo->groupInfo, &j);
......@@ -6095,29 +6090,27 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) {
SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
goto _error;
}
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput);
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
code = initGroupCol(pExprInfo, pGroupInfo, pInfo);
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -6126,7 +6119,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
// pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024;
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
pInfo->orderInfo = createBlockOrder(pExprInfo, num, pOrderVal);
pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
......@@ -6135,8 +6128,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = num;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSortedMerge;
......@@ -6151,7 +6144,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
_error:
if (pInfo != NULL) {
destroySortedMergeOperatorInfo(pInfo, numOfOutput);
destroySortedMergeOperatorInfo(pInfo, num);
}
tfree(pInfo);
......@@ -6160,8 +6153,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
return NULL;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6194,12 +6186,12 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
......@@ -6208,12 +6200,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
pInfo->orderInfo = createBlockOrder(pExprInfo, numOfCols, pOrderVal);
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
if (IS_VAR_DATA_TYPE(pExpr->base.resSchema.type)) {
for(int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pExprInfo[i].base.resSchema.type)) {
pInfo->hasVarCol = true;
break;
}
......@@ -6221,7 +6211,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo));
destroyOrderOperatorInfo(pInfo, numOfCols);
tfree(pInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -6247,10 +6237,9 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
}
// this is a blocking operator
static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
......@@ -6259,34 +6248,52 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
bool newgroup = true;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
}
doSetOperatorCompleted(pOperator);
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput);
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) {
SAggOperatorInfo *pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes);
doSetOperatorCompleted(pOperator);
return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6296,7 +6303,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
......@@ -6345,7 +6352,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -6354,55 +6361,50 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
return pInfo->pRes;
}
static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQueryAttr->order.order;
pRes->info.rows = 0;
blockDataClearup(pRes);
if (pProjectInfo->existDataBlock) { // TODO refactor
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL;
*newgroup = true;
// todo dynamic set tags
if (pTableQueryInfo != NULL) {
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
projectApplyFunctions(pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
if (pRes->info.rows >= pProjectInfo->threshold) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return pRes;
}
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
bool prevVal = *newgroup;
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
*newgroup = prevVal;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
......@@ -6421,75 +6423,76 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
}
}
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// todo dynamic set tags
if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) {
projectApplyFunctions(pInfo->pCtx, pOperator->numOfOutput);
// pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pRes);
if (pRes->info.rows >= pProjectInfo->threshold) {
break;
}
}
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
// resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
static SSDataBlock* doLimit(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
static SSDataBlock* doLimit(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
return NULL;
}
if (pRuntimeEnv->currentOffset == 0) {
if (pInfo->currentOffset == 0) {
break;
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
} else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else { // TODO handle the data movement
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes);
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pRuntimeEnv->currentOffset = 0;
pInfo->currentOffset = 0;
break;
}
}
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit;
if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows);
pInfo->currentRows = pInfo->limit.limit;
doSetOperatorCompleted(pOperator);
} else {
pInfo->total += pBlock->info.rows;
pInfo->currentRows += pBlock->info.rows;
}
return pBlock;
......@@ -6526,54 +6529,57 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
return NULL;
}
static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
STableIntervalOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pInfo->binfo.pRes;
}
// int32_t order = pQueryAttr->order.order;
// STimeWindow win = pQueryAttr->window;
// int32_t order = pQueryAttr->order.order;
// STimeWindow win = pQueryAttr->window;
bool newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
// restore the value
// pQueryAttr->order.order = order;
// pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity);
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
STableIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -6582,8 +6588,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
return pInfo->binfo.pRes->info.rows == 0? NULL:pInfo->binfo.pRes;
}
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6630,7 +6635,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->binfo.pCtx, &pIntervalInfo->binfo.resultRowInfo, pIntervalInfo->binfo.rowCellInfoOffset);
finalizeQueryResult(pIntervalInfo->binfo.pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
......@@ -6642,8 +6647,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->binfo.pRes->info.rows == 0? NULL:pIntervalInfo->binfo.pRes;
}
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6702,8 +6706,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->binfo.pRes;
}
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6836,8 +6839,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
// pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doStateWindowAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6882,7 +6884,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
......@@ -6894,32 +6896,24 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SSessionAggOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//pQueryAttr->order.order = TSDB_ORDER_ASC;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) {
......@@ -6931,31 +6925,26 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQueryAttr->order.order = order;
pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -6998,7 +6987,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
} else {
updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
}
......@@ -7044,9 +7033,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti
}
}
static SSDataBlock* doFill(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
......@@ -7152,39 +7139,50 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator);
}
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
pAggSup->pool = initResultRowPool(pAggSup->resultRowSize);
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
pAggSup->pResultRowHashTable == NULL || pAggSup->pool == NULL) {
pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
static void clearupAggSup(SAggSupporter* pAggSup) {
static void cleanupAggSup(SAggSupporter* pAggSup) {
tfree(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable);
taosHashCleanup(pAggSup->pResultRowListSet);
taosArrayDestroy(pAggSup->pResultRowArrayList);
destroyResultRowPool(pAggSup->pool);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) {
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResultBlock;
pInfo->binfo.capacity = numOfRows;
static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) {
pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
pBasicInfo->capacity = numOfRows;
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, taosArrayGetSize(pExprInfo));
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey);
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
STableQueryInfo* pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pTableQueryInfo == NULL) {
return NULL;
}
int32_t index = 0;
for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) {
......@@ -7192,7 +7190,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index++];
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey;
pTQueryInfo->groupIndex = i;
......@@ -7200,36 +7198,53 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
}
STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pInfo->pTableQueryInfo, false, win);
return TSDB_CODE_SUCCESS;
createTableQueryInfo(pTableQueryInfo, false, win);
return pTableQueryInfo;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t numOfRows = 1;
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
}
initAggInfo(pInfo, pExprInfo, numOfRows, pResultBlock, pTableGroupInfo);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doAggregate;
pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyAggOperatorInfo(pInfo, numOfCols);
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
......@@ -7242,33 +7257,41 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput);
}
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
}
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
}
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p);
}
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
......@@ -7325,12 +7348,15 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
tsem_destroy(&pExInfo->ready);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;
size_t numOfOutput = taosArrayGetSize(pExprInfo);
initAggInfo(pInfo, pExprInfo, numOfRows, pResBlock, pTableGroupInfo);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
}
size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup);
......@@ -7341,41 +7367,62 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
return NULL;
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t numOfRows = 4096;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResBlock;
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pCtx == NULL) {
goto _error;
}
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator";
// pOperator->operatorType = OP_Project;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = num;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doProjectOperation;
pOperator->closeFn = destroyProjectOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_OUT_OF_MEMORY) {
goto _error;
}
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
......@@ -7411,58 +7458,86 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0;
}
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream) {
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
ASSERT(numOfDownstream == 1);
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->limit = *pLimit;
pInfo->currentOffset = pLimit->offset;
pOperator->name = "LimitOperator";
// pOperator->operatorType = OP_Limit;
// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
size_t numOfOutput = taosArrayGetSize(pExprInfo);
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval;
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
pInfo->win.skey = INT64_MIN;
pInfo->win.ekey = INT64_MAX;
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
int32_t numOfRows = 4096;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = code;
return NULL;
}
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -7512,30 +7587,48 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = calloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
if (pInfo != NULL) {
destroySWindowOperatorInfo(pInfo, numOfCols);
}
tfree(pInfo);
tfree(pOperator);
pTaskInfo->code = code;
return NULL;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -7660,15 +7753,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit;
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
pInfo->currentOffset = pQueryAttr->limit.offset;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
// pInfo->slimit = pQueryAttr->slimit;
// pInfo->limit = pQueryAttr->limit;
// pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
// pInfo->currentOffset = pQueryAttr->limit.offset;
// pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->multigroupResult= multigroupResult;
// TODO refactor
......@@ -7704,7 +7795,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
static SSDataBlock* doTagScan(void* param, bool* newgroup) {
static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -7909,8 +8000,7 @@ static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBl
}
}
static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
static SSDataBlock* hashDistinct(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -8113,12 +8203,14 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s;
}
SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs);
SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
*numOfExprs = numOfFuncs;
SExprInfo* pExprs = calloc(numOfFuncs, sizeof(SExprInfo));
SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES);
for(int32_t i = 0; i < numOfAggFuncs; ++i) {
SExprInfo* pExp = calloc(1, sizeof(SExprInfo));
for(int32_t i = 0; i < numOfFuncs; ++i) {
SExprInfo* pExp = &pExprs[i];
pExp->pExpr = calloc(1, sizeof(tExprNode));
pExp->pExpr->_function.num = 1;
......@@ -8129,7 +8221,7 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
SColumn* pCol = pExp->base.pParam[0].pCol;
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, i);
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pNodeList, i);
ASSERT(pTargetNode->slotId == i);
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
......@@ -8154,10 +8246,9 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId;
}
taosArrayPush(pArray, &pExp);
}
return pArray;
return pExprs;
}
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
......@@ -8217,7 +8308,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
}
}
if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
......@@ -8225,9 +8316,40 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo);
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'};
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
}
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
......
......@@ -37,7 +37,6 @@ typedef struct SSortHandle {
SArray *pOrderInfo;
bool nullFirst;
bool hasVarCol;
SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp;
......@@ -77,6 +76,10 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
colInfo.info.bytes = pSchema[i].bytes;
colInfo.info.colId = pSchema[i].colId;
taosArrayPush(pBlock->pDataBlock, &colInfo);
if (IS_VAR_DATA_TYPE(colInfo.info.type)) {
pBlock->info.hasVarCol = true;
}
}
return pBlock;
......@@ -155,7 +158,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
while(start < pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pDataBlock, pHandle->hasVarCol, start, &stop, pHandle->pageSize);
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
if (p == NULL) {
return terrno;
......@@ -179,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
start = stop + 1;
}
blockDataClearup(pDataBlock, pHandle->hasVarCol);
blockDataClearup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
......@@ -309,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataClearup(pHandle->pDataBlock, pHandle->hasVarCol);
blockDataClearup(pHandle->pDataBlock);
while(1) {
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
......@@ -475,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataClearup(pDataBlock, pHandle->hasVarCol);
blockDataClearup(pDataBlock);
}
tMergeTreeDestroy(pHandle->pMergeTree);
......
......@@ -55,8 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock* pBlock;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......@@ -87,7 +86,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else {
blockDataClearup(pInfo->pBlock, true);
blockDataClearup(pInfo->pBlock);
}
SSDataBlock* pBlock = pInfo->pBlock;
......@@ -122,8 +121,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
return pBlock;
}
SSDataBlock* get2ColsDummyBlock(void* param, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......@@ -153,7 +151,7 @@ SSDataBlock* get2ColsDummyBlock(void* param, bool* newgroup) {
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else {
blockDataClearup(pInfo->pBlock, false);
blockDataClearup(pInfo->pBlock);
}
SSDataBlock* pBlock = pInfo->pBlock;
......
......@@ -46,13 +46,6 @@ extern SAggFunctionInfo aggFunc[35];
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
#define TOP_BOTTOM_QUERY_LIMIT 100
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_QUERY))
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SInterpInfoDetail {
TSKEY ts; // interp specified timestamp
int8_t type;
......@@ -61,9 +54,6 @@ typedef struct SInterpInfoDetail {
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value
double dOutput;
......@@ -71,8 +61,6 @@ typedef struct STwaInfo {
STimeWindow win;
} STwaInfo;
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/**
......
......@@ -52,10 +52,6 @@ static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(p
void functionFinalizer(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
}
doFinalizer(pResInfo);
}
......@@ -398,7 +394,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t *pData = (int32_t*)pCol->pData;
int32_t *val = (int32_t*) buf;
for (int32_t i = 0; i < pCtx->size; ++i) {
for (int32_t i = start; i < start + numOfRows; ++i) {
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
......
#include "os.h"
#include "tarray.h"
#include "function.h"
#include "thash.h"
#include "taggfunction.h"
static SHashObj* functionHashTable = NULL;
static SHashObj* udfHashTable = NULL;
static void doInitFunctionHashTable() {
int numOfEntries = tListLen(aggFunc);
functionHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false, false);
for (int32_t i = 0; i < numOfEntries; i++) {
int32_t len = (uint32_t)strlen(aggFunc[i].name);
SAggFunctionInfo* ptr = &aggFunc[i];
taosHashPut(functionHashTable, aggFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
}
/*
numOfEntries = tListLen(scalarFunc);
for(int32_t i = 0; i < numOfEntries; ++i) {
int32_t len = (int32_t) strlen(scalarFunc[i].name);
SScalarFunctionInfo* ptr = &scalarFunc[i];
taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
}
*/
udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true);
}
static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT;
int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len);
if (pInfo != NULL) {
*scalarFunction = ((*pInfo)->type == FUNCTION_TYPE_SCALAR);
return (*pInfo)->functionId;
} else {
return -1;
}
}
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId) {
return true;
}
bool qIsAggregateFunction(const char* functionName) {
assert(functionName != NULL);
bool scalarfunc = false;
qIsBuiltinFunction(functionName, strlen(functionName), &scalarfunc);
return !scalarfunc;
}
bool qIsSelectivityFunction(const char* functionName) {
assert(functionName != NULL);
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
size_t len = strlen(functionName);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, functionName, len);
if (pInfo != NULL) {
return ((*pInfo)->status | FUNCSTATE_SELECTIVITY) != 0;
}
return false;
}
SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len);
if (pInfo != NULL) {
return (*pInfo);
} else {
return NULL;
}
}
void qAddUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashPut(udfHashTable, pUdfInfo->name, len, (void*)&pUdfInfo, POINTER_BYTES);
}
void qRemoveUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashRemove(udfHashTable, pUdfInfo->name, len);
}
bool isTagsQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
// todo handle count(tbname) query
if (strcmp(f, "project") != 0 && strcmp(f, "count") != 0) {
return false;
}
// "select count(tbname)" query
// if (functId == FUNCTION_COUNT && pExpr->base.colpDesc->colId == TSDB_TBNAME_COLUMN_INDEX) {
// continue;
// }
}
return true;
}
//bool tscMultiRoundQuery(SArray* pFunctionIdList, int32_t index) {
// if (!UTIL_TABLE_IS_SUPER_TABLE(pQueryInfo->pTableMetaInfo[index])) {
// return false;
// }
//
// size_t numOfExprs = (int32_t) getNumOfExprs(pQueryInfo);
// for(int32_t i = 0; i < numOfExprs; ++i) {
// SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
// if (pExpr->base.functionId == FUNCTION_STDDEV_DST) {
// return true;
// }
// }
//
// return false;
//}
bool isProjectionQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return true;
}
}
return false;
}
bool isDiffDerivativeQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS_DUMMY) {
continue;
}
if (f == FUNCTION_DIFF || f == FUNCTION_DERIVATIVE) {
return true;
}
}
return false;
}
bool isInterpQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TAG || f == FUNCTION_TS) {
continue;
}
if (f != FUNCTION_INTERP) {
return false;
}
}
return true;
}
bool isArithmeticQueryOnAggResult(SArray* pFunctionIdList) {
if (isProjectionQuery(pFunctionIdList)) {
return false;
}
assert(0);
// size_t numOfOutput = getNumOfFields(pQueryInfo);
// for(int32_t i = 0; i < numOfOutput; ++i) {
// SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pExpr;
// if (pExprInfo->pExpr != NULL) {
// return true;
// }
// }
return false;
}
bool isGroupbyColumn(SGroupbyExpr* pGroupby) {
return !pGroupby->groupbyTag;
}
bool isTopBotQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
continue;
}
if (strcmp(f, "top") == 0 || strcmp(f, "bottom") == 0) {
return true;
}
}
return false;
}
bool isTsCompQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
if (num != 1) {
return false;
}
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, 0);
return f == FUNCTION_TS_COMP;
}
bool isTWAQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TWA) {
return true;
}
}
return false;
}
bool isIrateQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_IRATE) {
return true;
}
}
return false;
}
bool isStabledev(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_STDDEV_DST) {
return true;
}
}
return false;
}
bool needReverseScan(SArray* pFunctionIdList) {
assert(0);
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS || f == FUNCTION_TS_DUMMY || f == FUNCTION_TAG) {
continue;
}
// if ((f == FUNCTION_FIRST || f == FUNCTION_FIRST_DST) && pQueryInfo->order.order == TSDB_ORDER_DESC) {
// return true;
// }
if (f == FUNCTION_LAST || f == FUNCTION_LAST_DST) {
// the scan order to acquire the last result of the specified column
// int32_t order = (int32_t)pExpr->base.param[0].i64;
// if (order != pQueryInfo->order.order) {
// return true;
// }
}
}
return false;
}
bool isAgg(SArray* pFunctionIdList) {
size_t size = taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < size; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return false;
}
if (qIsAggregateFunction(f)) {
return true;
}
}
return false;
}
bool isBlockDistQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
char* f = *(char**) taosArrayGet(pFunctionIdList, 0);
return (num == 1 && strcmp(f, "block_dist") == 0);
}
bool isTwoStageSTableQuery(SArray* pFunctionIdList, int32_t tableIndex) {
// if (pQueryInfo == NULL) {
// return false;
// }
//
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
// if (pTableMetaInfo == NULL) {
// return false;
// }
//
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
// return false;
// }
//
// // for ordered projection query, iterate all qualified vnodes sequentially
// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
// return false;
// }
//
// if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) {
// return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
// }
return false;
}
bool isProjectionQueryOnSTable(SArray* pFunctionIdList, int32_t tableIndex) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
//
// /*
// * In following cases, return false for non ordered project query on super table
// * 1. failed to get tableMeta from server; 2. not a super table; 3. limitation is 0;
// * 4. show queries, instead of a select query
// */
// size_t numOfExprs = getNumOfExprs(pQueryInfo);
// if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) ||
// pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) {
// return false;
// }
//
// for (int32_t i = 0; i < numOfExprs; ++i) {
// int32_t functionId = getExprInfo(pQueryInfo, i)->base.functionId;
//
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
// if (pUdfInfo->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
// return false;
// }
//
// continue;
// }
//
// if (functionId != FUNCTION_PRJ &&
// functionId != FUNCTION_TAGPRJ &&
// functionId != FUNCTION_TAG &&
// functionId != FUNCTION_TS &&
// functionId != FUNCTION_ARITHM &&
// functionId != FUNCTION_TS_COMP &&
// functionId != FUNCTION_DIFF &&
// functionId != FUNCTION_DERIVATIVE &&
// functionId != FUNCTION_TS_DUMMY &&
// functionId != FUNCTION_TID_TAG) {
// return false;
// }
// }
return true;
}
bool hasTagValOutput(SArray* pFunctionIdList) {
size_t size = taosArrayGetSize(pFunctionIdList);
// if (numOfExprs == 1 && pExpr1->base.functionId == FUNCTION_TS_COMP) {
// return true;
// }
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// ts_comp column required the tag value for join filter
if (functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
return true;
}
}
return false;
}
//bool timeWindowInterpoRequired(SArray* pFunctionIdList) {
// int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
// for (int32_t i = 0; i < num; ++i) {
// int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// if (f == FUNCTION_TWA || f == FUNCTION_INTERP) {
// return true;
// }
// }
//
// return false;
//}
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
assert(pFunctionIdList != NULL);
pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList);
if (pDesc->blockDistribution) {
return;
}
// pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
// pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
pDesc->interpQuery = isInterpQuery(pFunctionIdList);
pDesc->topbotQuery = isTopBotQuery(pFunctionIdList);
pDesc->agg = isAgg(pFunctionIdList);
}
......@@ -580,6 +580,8 @@ static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
......@@ -601,6 +603,12 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanIntervalUnit, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
......@@ -627,6 +635,12 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanIntervalUnit, &pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanSlidingUnit, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
......@@ -1644,7 +1658,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToSubplan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN:
return jsonToPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
default:
assert(0);
break;
}
nodesWarn("jsonToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj)));
......
......@@ -310,9 +310,12 @@ static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInt
pWindow->node.id = pCxt->planNodeId++;
pWindow->winType = WINDOW_TYPE_INTERVAL;
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode;
pWindow->interval = pIntervalNode->datum.i;
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
......
......@@ -480,6 +480,9 @@ static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p
pInterval->interval = pWindowLogicNode->interval;
pInterval->offset = pWindowLogicNode->offset;
pInterval->sliding = pWindowLogicNode->sliding;
pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
SNodeList* pPrecalcExprs = NULL;
......
......@@ -12,7 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC zlib
PUBLIC zlibstatic
)
if (${BUILD_WITH_UV_TRANS})
if (${BUILD_WITH_UV})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册