提交 f69a885d 编写于 作者: H Haojun Liao

[td-10564]Add implementation in executor.

上级 778dd8a1
......@@ -19,7 +19,7 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tvariant.h"
//typedef struct STimeWindow {
// TSKEY skey;
// TSKEY ekey;
......@@ -66,4 +66,44 @@ typedef struct SColumnInfoData {
char *pData; // the corresponding block data in memory
} SColumnInfoData;
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SOrder {
uint32_t order;
int32_t orderColId;
} SOrder;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
// the structure for sql function in select clause
typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
SColIndex colInfo; // there may be mutiple input columns
uint64_t uid; // table uid, todo refactor use the pointer
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
struct SSqlExpr base;
struct tExprNode *pExpr;
} SExprInfo;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#endif // TDENGINE_COMMON_H
......@@ -16,6 +16,8 @@
#ifndef TDENGINE_TNAME_H
#define TDENGINE_TNAME_H
#include "taosmsg.h"
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
......@@ -52,6 +54,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t tNameSetAcctId(SName* dst, const char* acct);
SSchema* tGetTbnameColumnSchema();
#if 0
int32_t tNameSetDbName(SName* dst, const char* acct, SToken* dbToken);
#endif
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "tdef.h"
#include "tvariant.h"
#define MEM_BUF_SIZE (1 << 20)
......
......@@ -78,13 +78,28 @@ extern "C" {
#define FUNCTION_MODE 36
#define FUNCTION_SAMPLE 37
// determine the real data need to calculated the result
enum {
BLK_DATA_NO_NEEDED = 0x0,
BLK_DATA_STATIS_NEEDED = 0x1,
BLK_DATA_ALL_NEEDED = 0x3,
BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter
};
enum {
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
};
typedef struct SPoint1 {
int64_t key;
union{double val; char* ptr;};
} SPoint1;
struct SQLFunctionCtx;
struct SResultRowCellInfo;
struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SExtTagsInfo {
......@@ -93,6 +108,8 @@ typedef struct SExtTagsInfo {
struct SQLFunctionCtx **pTagCtxList;
} SExtTagsInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
// sql function runtime context
typedef struct SQLFunctionCtx {
int32_t size; // number of rows
......@@ -117,9 +134,9 @@ typedef struct SQLFunctionCtx {
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SVariant tag;
bool isSmaSet;
SColumnDataAgg sma;
struct SResultRowCellInfo *resultInfo;
bool isAggSet;
SColumnDataAgg agg;
struct SResultRowEntryInfo *resultInfo;
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
......@@ -161,7 +178,7 @@ typedef struct SAggFunctionInfo {
int8_t sFunctionId; // Transfer function for super table query
uint16_t status;
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowCellInfo* pResultCellInfo); // setup the execute environment
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*exec)(SQLFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
......@@ -176,7 +193,7 @@ typedef struct SScalarFunctionInfo {
int8_t type; // scalar function or aggregation function
uint8_t functionId; // index of scalar function
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowCellInfo* pResultCellInfo); // setup the execute environment
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*exec)(SQLFunctionCtx *pCtx);
} SScalarFunctionInfo;
......@@ -221,10 +238,48 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
const char* qGetFunctionName(int32_t functionId);
tExprNode* exprTreeFromBinary(const void* data, size_t size);
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
tExprNode* exprdup(tExprNode* pTree);
void resetResultRowEntryResult(SQLFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SQLFunctionCtx* pCtx, int32_t num);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
struct SScalarFunctionSupport* createScalarFuncSupport(int32_t num);
void destroyScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t num);
struct SScalarFunctionSupport* getScalarFuncSupport(struct SScalarFunctionSupport* pSupport, int32_t index);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// fill api
struct SFillInfo;
struct SFillColInfo;
typedef struct SPoint {
int64_t key;
void * val;
} SPoint;
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
struct SFillColInfo* pFillCol, void* handle);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
#ifdef __cplusplus
}
#endif
......
......@@ -32,21 +32,7 @@ typedef struct SColumn {
SColumnInfo info;
} SColumn;
// the structure for sql function in select clause
typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
SColIndex colInfo;
uint64_t uid; // table uid, todo refactor use the pointer
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
SSqlExpr base;
struct tExprNode *pExpr;
} SExprInfo;
//typedef struct SInterval {
// int32_t tz; // query client timezone
......@@ -63,13 +49,6 @@ typedef struct SExprInfo {
// int32_t primaryColId; // primary timestamp column
//} SSessionWindow;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
......@@ -82,16 +61,6 @@ typedef struct SFieldInfo {
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SOrder {
uint32_t order;
int32_t orderColId;
} SOrder;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
......
......@@ -45,6 +45,8 @@ extern "C" {
#include <float.h>
#include <math.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "osAtomic.h"
#include "osDef.h"
......
......@@ -140,7 +140,8 @@ do { \
#define TSDB_UNARY_OP_ROUND 4503
#define TSDB_UNARY_OP_LEN 4600
#define TSDB_UNARY_OP_LTRIM 4601
#define TSDB_UNARY_OP_RTRIM 4601
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
......
......@@ -8,5 +8,5 @@ target_include_directories(
target_link_libraries(
executor
PRIVATE os util common
PRIVATE os util common function parser
)
\ No newline at end of file
......@@ -15,6 +15,8 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#include "common.h"
#include "tpagedfile.h"
#include "tbuffer.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
......@@ -40,42 +42,92 @@
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv);
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
typedef struct SGroupResInfo {
int32_t totalGroup;
int32_t currentGroup;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
bool ordered;
int32_t position;
} SGroupResInfo;
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t offset:29; // row index in buffer page
bool startInterp; // the time window start timestamp has done the interpolation already.
bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo
STimeWindow win;
char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowInfo {
SResultRow** pResult; // result list
int16_t type:8; // data type for hash key
int32_t size:24; // number of result set
int32_t capacity; // max capacity
int32_t curPos; // current active result row index of pResult list
} SResultRowInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
int32_t numOfElemPerBlock;
struct {
int32_t blockIndex;
int32_t pos;
} position;
SArray* pData; // SArray<void*>
} SResultRowPool;
struct SQueryAttr;
struct SQueryRuntimeEnv;
struct SUdfInfo;
int32_t getOutputInterResultBufSize(struct SQueryAttr* pQueryAttr);
size_t getResultRowSize(struct SQueryRuntimeEnv* pRuntimeEnv);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
void clearResultRow(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
int32_t getRowNumForMultioutput(struct SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage* page, int32_t rowOffset,
static FORCE_INLINE char* getPosInResultPage(struct SQueryAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// return ((char *)page->data) + rowOffset + offset * numOfRows;
}
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
__filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr);
......@@ -103,8 +155,8 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(SUdfInfo* pUdfInfo);
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
此差异已折叠。
......@@ -20,9 +20,9 @@
extern "C" {
#endif
#include "texpr.h"
#include "hash.h"
#include "thash.h"
#include "tname.h"
#include "function.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
......@@ -105,7 +105,7 @@ typedef struct SFilterColRange {
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SDataStatis *, int16_t);
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t);
typedef struct SFilterRangeCompare {
int64_t s;
......@@ -324,13 +324,13 @@ typedef struct SFilterInfo {
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SDataStatis *statis, int16_t numOfCols);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SDataStatis *pDataStatis, int32_t numOfCols, int32_t numOfRows);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus
}
......
......@@ -15,11 +15,11 @@
#include "os.h"
#include "taosmsg.h"
#include "hash.h"
#include "thash.h"
#include "qExecutor.h"
#include "qUtil.h"
#include "queryLog.h"
#include "executil.h"
#include "executorimpl.h"
//#include "queryLog.h"
#include "tbuffer.h"
#include "tcompression.h"
#include "tlosertree.h"
......@@ -33,9 +33,9 @@ typedef struct SCompSupporter {
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
}
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
// return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i;
// }
}
}
......@@ -143,18 +143,18 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
// the result does not put into the SDiskbasedResultBuf, ignore it.
if (pResultRow->pageId >= 0) {
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i];
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resType;
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
memset(s, 0, size);
offset += size;
RESET_RESULT_INFO(pResultInfo);
cleanupResultRowEntry(pEntryInfo);
}
}
......@@ -168,14 +168,16 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
}
// TODO refactor: use macro
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
assert(index >= 0 && offset != NULL);
return (SResultRowCellInfo*)((char*) pRow->pCellInfo + offset[index]);
// return (SResultRowEntryInfo*)((char*) pRow->pCellInfo + offset[index]);
return NULL;
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
return (pQueryAttr->numOfOutput * sizeof(SResultRowCellInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
return 0;
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
}
SResultRowPool* initResultRowPool(size_t size) {
......@@ -271,9 +273,9 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) {
tbufWriteUint32(bw, numOfRows);
for(int32_t k = 0; k < numOfRows; ++k) {
SResPair v = *(SResPair*) taosArrayGet(p->pResult, k);
tbufWriteDouble(bw, v.avg);
tbufWriteInt64(bw, v.key);
// SResPair v = *(SResPair*) taosArrayGet(p->pResult, k);
// tbufWriteDouble(bw, v.avg);
// tbufWriteInt64(bw, v.key);
}
}
}
......@@ -301,19 +303,19 @@ SArray* interResFromBinary(const char* data, int32_t len) {
SArray* p = taosArrayInit(numOfCols, sizeof(SStddevInterResult));
for(int32_t j = 0; j < numOfCols; ++j) {
int16_t colId = tbufReadUint16(&br);
// int16_t colId = tbufReadUint16(&br);
int32_t numOfRows = tbufReadUint32(&br);
SStddevInterResult interRes = {.colId = colId, .pResult = taosArrayInit(4, sizeof(struct SResPair)),};
// SStddevInterResult interRes = {.colId = colId, .pResult = taosArrayInit(4, sizeof(struct SResPair)),};
for(int32_t k = 0; k < numOfRows; ++k) {
SResPair px = {0};
px.avg = tbufReadDouble(&br);
px.key = tbufReadInt64(&br);
taosArrayPush(interRes.pResult, &px);
// SResPair px = {0};
// px.avg = tbufReadDouble(&br);
// px.key = tbufReadInt64(&br);
//
// taosArrayPush(interRes.pResult, &px);
}
taosArrayPush(p, &interRes);
// taosArrayPush(p, &interRes);
}
char* p1 = NULL;
......@@ -395,22 +397,22 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = pQueryAttr->pExpr1[j].base.functionId;
int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
continue;
}
SResultRowCellInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
assert(pResultInfo != NULL);
if (pResultInfo->numOfRes > 0) {
return pResultInfo->numOfRes;
}
// SResultRowEntryInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
// assert(pResultInfo != NULL);
//
// if (pResultInfo->numOfRes > 0) {
// return pResultInfo->numOfRes;
// }
}
return 0;
......@@ -545,7 +547,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv));
// qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
......@@ -617,8 +619,8 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
int64_t endt = taosGetTimestampMs();
qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, endt - startt);
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt);
_end:
tfree(pTableQueryInfoList);
......@@ -639,90 +641,90 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
break;
}
qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup);
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
return TSDB_CODE_SUCCESS;
}
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
tbufWriteUint32(bw, pDist->numOfTables);
tbufWriteUint16(bw, pDist->numOfFiles);
tbufWriteUint64(bw, pDist->totalSize);
tbufWriteUint64(bw, pDist->totalRows);
tbufWriteInt32(bw, pDist->maxRows);
tbufWriteInt32(bw, pDist->minRows);
tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
tbufWriteUint32(bw, pDist->numOfSmallBlocks);
tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
// compress the binary string
char* p = TARRAY_GET_START(pDist->dataBlockInfos);
// compress extra bytes
size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
char* tmp = malloc(x + 2);
bool comp = false;
int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0);
if (len == -1 || len >= x) { // compress failed, do not compress this binary data
comp = false;
len = (int32_t)x;
} else {
comp = true;
}
tbufWriteUint8(bw, comp);
tbufWriteUint32(bw, len);
if (comp) {
tbufWriteBinary(bw, tmp, len);
} else {
tbufWriteBinary(bw, p, len);
}
tfree(tmp);
}
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
SBufferReader br = tbufInitReader(data, len, false);
pDist->numOfTables = tbufReadUint32(&br);
pDist->numOfFiles = tbufReadUint16(&br);
pDist->totalSize = tbufReadUint64(&br);
pDist->totalRows = tbufReadUint64(&br);
pDist->maxRows = tbufReadInt32(&br);
pDist->minRows = tbufReadInt32(&br);
pDist->numOfRowsInMemTable = tbufReadUint32(&br);
pDist->numOfSmallBlocks = tbufReadUint32(&br);
int64_t numSteps = tbufReadUint64(&br);
bool comp = tbufReadUint8(&br);
uint32_t compLen = tbufReadUint32(&br);
size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
char* outputBuf = NULL;
if (comp) {
outputBuf = malloc(originalLen);
size_t actualLen = compLen;
const char* compStr = tbufReadBinary(&br, &actualLen);
int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
(int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
} else {
outputBuf = (char*) tbufReadBinary(&br, &originalLen);
}
pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
if (comp) {
tfree(outputBuf);
}
}
//void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
// tbufWriteUint32(bw, pDist->numOfTables);
// tbufWriteUint16(bw, pDist->numOfFiles);
// tbufWriteUint64(bw, pDist->totalSize);
// tbufWriteUint64(bw, pDist->totalRows);
// tbufWriteInt32(bw, pDist->maxRows);
// tbufWriteInt32(bw, pDist->minRows);
// tbufWriteUint32(bw, pDist->numOfRowsInMemTable);
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
//
// // compress the binary string
// char* p = TARRAY_GET_START(pDist->dataBlockInfos);
//
// // compress extra bytes
// size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
// char* tmp = malloc(x + 2);
//
// bool comp = false;
// int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0);
// if (len == -1 || len >= x) { // compress failed, do not compress this binary data
// comp = false;
// len = (int32_t)x;
// } else {
// comp = true;
// }
//
// tbufWriteUint8(bw, comp);
// tbufWriteUint32(bw, len);
// if (comp) {
// tbufWriteBinary(bw, tmp, len);
// } else {
// tbufWriteBinary(bw, p, len);
// }
// tfree(tmp);
//}
//void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
// SBufferReader br = tbufInitReader(data, len, false);
//
// pDist->numOfTables = tbufReadUint32(&br);
// pDist->numOfFiles = tbufReadUint16(&br);
// pDist->totalSize = tbufReadUint64(&br);
// pDist->totalRows = tbufReadUint64(&br);
// pDist->maxRows = tbufReadInt32(&br);
// pDist->minRows = tbufReadInt32(&br);
// pDist->numOfRowsInMemTable = tbufReadUint32(&br);
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
// int64_t numSteps = tbufReadUint64(&br);
//
// bool comp = tbufReadUint8(&br);
// uint32_t compLen = tbufReadUint32(&br);
//
// size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
//
// char* outputBuf = NULL;
// if (comp) {
// outputBuf = malloc(originalLen);
//
// size_t actualLen = compLen;
// const char* compStr = tbufReadBinary(&br, &actualLen);
//
// int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
// (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
// assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
// } else {
// outputBuf = (char*) tbufReadBinary(&br, &originalLen);
// }
//
// pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
// if (comp) {
// tfree(outputBuf);
// }
//}
此差异已折叠。
此差异已折叠。
......@@ -30,12 +30,12 @@ extern "C" {
extern SAggFunctionInfo aggFunc[34];
typedef struct SResultRowCellInfo {
typedef struct SResultRowEntryInfo {
int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized
bool complete; // query has completed
uint32_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo;
} SResultRowEntryInfo;
#define FUNCSTATE_SO 0x0u
#define FUNCSTATE_MO 0x1u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -52,54 +52,24 @@ typedef struct SResultRowCellInfo {
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define TOP_BOTTOM_QUERY_LIMIT 100
enum {
MASTER_SCAN = 0x0u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
};
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_QUERY))
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SArithmeticSupport {
struct SExprInfo *pExprInfo;
int32_t numOfCols;
SColumnInfo *colList;
void *exprList; // client side used
int32_t offset;
char** data;
} SArithmeticSupport;
typedef struct SInterpInfoDetail {
TSKEY ts; // interp specified timestamp
int8_t type;
int8_t primaryCol;
} SInterpInfoDetail;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
// determine the real data need to calculated the result
enum {
BLK_DATA_NO_NEEDED = 0x0,
BLK_DATA_STATIS_NEEDED = 0x1,
BLK_DATA_ALL_NEEDED = 0x3,
BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter
};
typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value
double dOutput;
......@@ -115,12 +85,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
*/
#define RESET_RESULT_INFO(_r) \
do { \
(_r)->initialized = false; \
} while (0)
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) {
static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
......
......@@ -60,7 +60,6 @@ typedef struct SExprTraverseSupp {
void *pExtInfo;
} SExprTraverseSupp;
tExprNode* exprTreeFromBinary(const void* data, size_t size);
tExprNode* exprTreeFromTableName(const char* tbnameCond);
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册