提交 77dfb9fe 编写于 作者: H Haojun Liao

[td-2859]refactor.

上级 4b86150b
......@@ -62,6 +62,8 @@ typedef struct SLocalMerger {
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
char* tagBuf; // max tag buffer
int32_t tagBufLen;
} SLocalMerger;
typedef struct SRetrieveSupport {
......
......@@ -98,10 +98,9 @@ typedef struct SMergeTsCtx {
int8_t compared;
}SMergeTsCtx;
typedef struct SVgroupTableInfo {
SVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
......@@ -321,7 +320,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -213,7 +213,7 @@ typedef struct SQueryInfo {
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
......@@ -420,6 +420,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
此差异已折叠。
......@@ -1573,12 +1573,40 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
return code;
}
pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
if (pQueryInfo->pQInfo == NULL) {
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group);
SExprInfo* list = calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SExprInfo));
for(int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, i);
list[i] = *pExprInfo;
}
pQueryInfo->pQInfo = createQueryInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE);
}
uint64_t localQueryId = 0;
SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info;
pInfo->pMerge = pRes->pLocalMerger;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
//pRes->code = tscDoLocalMerge(pSql);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p);
// tscSetResRawPtr(pRes, pQueryInfo);
}
pRes->row = 0;
......
......@@ -3467,7 +3467,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscIsProjectionQuery(pQueryInfo1)) || (pRes1->numOfRows == 0)) {
tscIsProjectionQuery(pQueryInfo1)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
......@@ -3477,7 +3478,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr) {
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) {
assert(pQueryInfo != NULL);
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
......@@ -3493,7 +3495,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQueryAttr = pQueryAttr;
tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, addr);
tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, NULL);
pQueryAttr->tableGroupInfo = *pTableGroupInfo;
......@@ -3580,15 +3582,17 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
tfree(pExprs);
SArray* pa = createExecOperatorPlan(pQueryAttr);
SArray* pa = NULL;
if (stage == MASTER_SCAN) {
pa = createExecOperatorPlan(pQueryAttr);
} else {
pa = createGlobalMergePlan(pQueryAttr);
}
STsBufInfo bufInfo = {0};
SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, &param, NULL, 0);
pQInfo->runtimeEnv.proot->upstream = pOperator;
qTableQuery(pQInfo, NULL);
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, &param, NULL, 0, merger);
// pQInfo->runtimeEnv.proot->upstream = pSourceOperator;
return pQInfo;
_cleanup:
......
......@@ -522,6 +522,75 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
}
}
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) {
assert(pRes->numOfCols > 0);
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
pRes->urow[i] = pColData->pData + offset * pColData->info.bytes;
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
// generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen);
varDataSetLen(p, pInfo->pExpr->base.param[1].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
}
}
}
} else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p));
} else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
} else {
varDataSetLen(dst, 0);
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}
}
}
static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* pTableCols) {
int32_t numOfCols = taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
......@@ -626,8 +695,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL);
//printf("%p\n", pQInfo);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf;
// build result
......@@ -3250,6 +3318,60 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
return TSDB_CODE_SUCCESS;
}
static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo) {
assert(tscIsTwoStageSTableQuery(pQueryInfo, 0));
pQueryAttr->numOfExpr3 = tscNumOfFields(pQueryInfo);
pQueryAttr->pExpr3 = calloc(pQueryAttr->numOfExpr3, sizeof(SExprInfo));
if (pQueryAttr->pExpr3 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) {
SExprInfo* pExpr = &pQueryAttr->pExpr1[i];
SSqlExpr* pse = &pQueryAttr->pExpr3[i].base;
*pse = pExpr->base;
pse->colInfo.colId = pExpr->base.resColId;
pse->colInfo.colIndex = i;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
}
}
{
for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) {
SExprInfo* pExpr = &pQueryAttr->pExpr1[i];
SSqlExpr* pse = &pQueryAttr->pExpr3[i].base;
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int32_t inter = 0;
int32_t functionId = pExpr->base.functionId;
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
continue;
}
if (functionId == TSDB_FUNC_FIRST_DST) {
functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
functionId = TSDB_FUNC_STDDEV;
}
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType,
&pse->resBytes, &inter, 0, false);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
if (pTableMetaInfo->tagColList == NULL) {
return TSDB_CODE_SUCCESS;
......@@ -3352,6 +3474,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
return code;
}
// global aggregate query
if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
createGlobalAggregateExpr(pQueryAttr, pQueryInfo);
}
// tag column info
code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -210,9 +210,6 @@ typedef struct SAggFunctionInfo {
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
// some sql function require scan data twice or more, e.g.,stddev, percentile
void (*xNextStep)(SQLFunctionCtx *pCtx);
// finalizer must be called after all xFunction has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
......
......@@ -212,9 +212,13 @@ typedef struct SQueryAttr {
int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query
SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SExprInfo* pExpr3;
int32_t numOfExpr3;
SColumnInfo* colList;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
......@@ -290,7 +294,8 @@ enum OPERATOR_TYPE_E {
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
OP_MultiwayMerge = 17, // multi-way merge process for partial results from different vnodes
OP_MultiwaySort = 17, // multi-way data merge into one input stream.
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
};
typedef struct SOperatorInfo {
......@@ -412,10 +417,6 @@ typedef struct SLimitOperatorInfo {
int64_t total;
} SLimitOperatorInfo;
typedef struct SOffsetOperatorInfo {
int64_t offset;
} SOffsetOperatorInfo;
typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
SSDataBlock *pRes;
......@@ -436,6 +437,17 @@ typedef struct SSWindowOperatorInfo {
int32_t start; // start row index
} SSWindowOperatorInfo;
struct SLocalMerger;
typedef struct SMultiwayMergeInfo {
struct SLocalMerger *pMerge;
SOptrBasicInfo binfo;
int64_t seed;
char **prevRow;
bool hasPrev;
SArray *orderColumnList;
} SMultiwayMergeInfo;
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
......@@ -451,7 +463,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, int32_t numOfOrder);
SSDataBlock* doGlobalAggregate(void* param);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
......@@ -466,7 +485,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen);
int32_t prevResultLen, void* merger);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -237,6 +237,9 @@ int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1
int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2,
char *data2);
struct SSDataBlock;
int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order);
#ifdef __cplusplus
}
#endif
......
......@@ -19,5 +19,6 @@
//TODO refactor
SArray* createTableScanPlan(SQueryAttr* pQueryAttr);
SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr);
SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr);
#endif // TDENGINE_QPLAN_H
......@@ -375,12 +375,6 @@ int32_t isValidFunction(const char* name, int32_t len) {
return -1;
}
// set the query flag to denote that query is completed
static void no_next_step(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->complete = true;
}
static bool function_setup(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->initialized) {
......@@ -1540,7 +1534,7 @@ static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
static void stddev_next_step(SQLFunctionCtx *pCtx) {
static UNUSED_FUNC void stddev_next_step(SQLFunctionCtx *pCtx) {
/*
* the stddevInfo and the average info struct share the same buffer area
* And the position of each element in their struct is exactly the same matched
......@@ -2907,7 +2901,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static void percentile_next_step(SQLFunctionCtx *pCtx) {
static UNUSED_FUNC void percentile_next_step(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -4891,7 +4885,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
count_function,
count_function_f,
no_next_step,
doFinalizer,
count_func_merge,
countRequired,
......@@ -4905,7 +4898,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
sum_function,
sum_function_f,
no_next_step,
function_finalizer,
sum_func_merge,
statisRequired,
......@@ -4919,7 +4911,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
avg_function,
avg_function_f,
no_next_step,
avg_finalizer,
avg_func_merge,
statisRequired,
......@@ -4933,7 +4924,6 @@ SAggFunctionInfo aAggs[] = {{
min_func_setup,
min_function,
min_function_f,
no_next_step,
function_finalizer,
min_func_merge,
statisRequired,
......@@ -4947,7 +4937,6 @@ SAggFunctionInfo aAggs[] = {{
max_func_setup,
max_function,
max_function_f,
no_next_step,
function_finalizer,
max_func_merge,
statisRequired,
......@@ -4961,7 +4950,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
stddev_function,
stddev_function_f,
stddev_next_step,
stddev_finalizer,
noop1,
dataBlockRequired,
......@@ -4975,7 +4963,6 @@ SAggFunctionInfo aAggs[] = {{
percentile_function_setup,
percentile_function,
percentile_function_f,
percentile_next_step,
percentile_finalizer,
noop1,
dataBlockRequired,
......@@ -4989,7 +4976,6 @@ SAggFunctionInfo aAggs[] = {{
apercentile_function_setup,
apercentile_function,
apercentile_function_f,
no_next_step,
apercentile_finalizer,
apercentile_func_merge,
dataBlockRequired,
......@@ -5003,7 +4989,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
first_function,
first_function_f,
no_next_step,
function_finalizer,
noop1,
firstFuncRequired,
......@@ -5017,7 +5002,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
last_function,
last_function_f,
no_next_step,
function_finalizer,
noop1,
lastFuncRequired,
......@@ -5032,7 +5016,6 @@ SAggFunctionInfo aAggs[] = {{
first_last_function_setup,
last_row_function,
noop2,
no_next_step,
last_row_finalizer,
last_dist_func_merge,
dataBlockRequired,
......@@ -5047,7 +5030,6 @@ SAggFunctionInfo aAggs[] = {{
top_bottom_function_setup,
top_function,
top_function_f,
no_next_step,
top_bottom_func_finalizer,
top_func_merge,
dataBlockRequired,
......@@ -5062,7 +5044,6 @@ SAggFunctionInfo aAggs[] = {{
top_bottom_function_setup,
bottom_function,
bottom_function_f,
no_next_step,
top_bottom_func_finalizer,
bottom_func_merge,
dataBlockRequired,
......@@ -5076,7 +5057,6 @@ SAggFunctionInfo aAggs[] = {{
spread_function_setup,
spread_function,
spread_function_f,
no_next_step,
spread_function_finalizer,
spread_func_merge,
countRequired,
......@@ -5090,7 +5070,6 @@ SAggFunctionInfo aAggs[] = {{
twa_function_setup,
twa_function,
twa_function_f,
no_next_step,
twa_function_finalizer,
twa_function_copy,
dataBlockRequired,
......@@ -5104,7 +5083,6 @@ SAggFunctionInfo aAggs[] = {{
leastsquares_function_setup,
leastsquares_function,
leastsquares_function_f,
no_next_step,
leastsquares_finalizer,
noop1,
dataBlockRequired,
......@@ -5118,7 +5096,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
date_col_output_function,
date_col_output_function_f,
no_next_step,
doFinalizer,
copy_function,
noDataRequired,
......@@ -5132,7 +5109,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
noop1,
noop2,
no_next_step,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -5146,7 +5122,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
tag_function,
noop2,
no_next_step,
doFinalizer,
copy_function,
noDataRequired,
......@@ -5160,7 +5135,6 @@ SAggFunctionInfo aAggs[] = {{
ts_comp_function_setup,
ts_comp_function,
ts_comp_function_f,
no_next_step,
ts_comp_finalize,
copy_function,
dataBlockRequired,
......@@ -5174,7 +5148,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
tag_function,
tag_function_f,
no_next_step,
doFinalizer,
copy_function,
noDataRequired,
......@@ -5188,7 +5161,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
col_project_function,
col_project_function_f,
no_next_step,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -5202,7 +5174,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
tag_project_function,
tag_project_function_f,
no_next_step,
doFinalizer,
copy_function,
noDataRequired,
......@@ -5216,7 +5187,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
arithmetic_function,
arithmetic_function_f,
no_next_step,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -5230,7 +5200,6 @@ SAggFunctionInfo aAggs[] = {{
diff_function_setup,
diff_function,
diff_function_f,
no_next_step,
doFinalizer,
noop1,
dataBlockRequired,
......@@ -5245,7 +5214,6 @@ SAggFunctionInfo aAggs[] = {{
first_last_function_setup,
first_dist_function,
first_dist_function_f,
no_next_step,
function_finalizer,
first_dist_func_merge,
firstDistFuncRequired,
......@@ -5259,7 +5227,6 @@ SAggFunctionInfo aAggs[] = {{
first_last_function_setup,
last_dist_function,
last_dist_function_f,
no_next_step,
function_finalizer,
last_dist_func_merge,
lastDistFuncRequired,
......@@ -5273,7 +5240,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
stddev_dst_function,
stddev_dst_function_f,
no_next_step,
stddev_dst_finalizer,
stddev_dst_merge,
dataBlockRequired,
......@@ -5287,7 +5253,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
interp_function,
do_sum_f, // todo filter handle
no_next_step,
doFinalizer,
copy_function,
dataBlockRequired,
......@@ -5301,7 +5266,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
......@@ -5315,7 +5279,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
......@@ -5329,7 +5292,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
dataBlockRequired,
......@@ -5343,7 +5305,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
dataBlockRequired,
......@@ -5357,7 +5318,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
dataBlockRequired,
......@@ -5371,7 +5331,6 @@ SAggFunctionInfo aAggs[] = {{
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
dataBlockRequired,
......@@ -5385,7 +5344,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
noop1,
noop2,
no_next_step,
noop1,
noop1,
dataBlockRequired,
......@@ -5399,7 +5357,6 @@ SAggFunctionInfo aAggs[] = {{
function_setup,
blockInfo_func,
noop2,
no_next_step,
blockinfo_func_finalizer,
block_func_merge,
dataBlockRequired,
......
......@@ -180,7 +180,7 @@ static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowIn
int32_t groupIndex);
// setup the output buffer for each operator
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
const static int32_t minSize = 8;
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
......@@ -351,7 +351,6 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
prepareResultListBuffer(pResultRowInfo, pRuntimeEnv);
SResultRow *pResult = NULL;
if (p1 == NULL) {
pResult = getNewResultRow(pRuntimeEnv->pool);
int32_t ret = initResultRow(pResult);
......@@ -879,7 +878,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC
}
}
static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
if (pCtx[0].functionId == TSDB_FUNC_ARITHM) {
SArithmeticSupport* pSupport = (SArithmeticSupport*) pCtx[0].param[1].pz;
if (pSupport->colList == NULL) {
......@@ -1618,7 +1617,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator) {
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) {
qDebug("QInfo:%"PRIu64" setup runtime env", GET_QID(pRuntimeEnv));
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -1728,80 +1727,24 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
default: {
assert(0);
case OP_MultiwaySort: {
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3,
4096, merger); // TODO hack it
break;
}
}
}
/*
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
} else if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
if (pQueryAttr->stableQuery) {
pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner,
pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot =
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQueryAttr->pExpr2 != NULL) {
pRuntimeEnv->proot =
createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
case OP_GlobalAggregate: {
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1);
break;
}
if (pQueryAttr->fillType != TSDB_FILL_NONE && !pQueryAttr->pointInterpQuery) {
SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput);
default: {
assert(0);
}
}
} else if (pQueryAttr->groupbyColumn) {
pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQueryAttr->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
}
} else if (pQueryAttr->sw.gap > 0) {
pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQueryAttr->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
}
} else if (pQueryAttr->simpleAgg) {
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
pRuntimeEnv->proot =
createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
} else {
pRuntimeEnv->proot =
createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
}
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
}
} else { // diff/add/multiply/subtract/division
if (!onlyQueryTags(pQueryAttr)) {
pRuntimeEnv->proot =
createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
}
}
if (pQueryAttr->limit.offset > 0) {
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQueryAttr->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
*/
return TSDB_CODE_SUCCESS;
_clean:
......@@ -3918,7 +3861,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator) {
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator,
void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
......@@ -3981,7 +3925,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4355,6 +4299,55 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
return pOptr;
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput,
int32_t* orderColumn, int32_t numOfOrder) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows =
(int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// TODO refactor
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resBytes;
}
pInfo->prevRow = taosArrayInit(numOfOrder, (POINTER_BYTES * numOfOrder + len));
int32_t offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfOrder; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
int32_t index = orderColumn[i];
offset += pExpr[index].base.resBytes;
}
pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t));
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate";
pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order;
}
......@@ -5122,6 +5115,25 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator;
}
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator";
pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->exec = doLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
return pOperator;
}
static SSDataBlock* doTagScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -6264,7 +6276,8 @@ bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen) {
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -6309,7 +6322,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator)) != TSDB_CODE_SUCCESS) {
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -20,6 +20,7 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tulog.h"
#include "qExecutor.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
......@@ -351,6 +352,18 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
}
}
static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) {
if (k1 == k2) {
return 0;
}
if (order == TSDB_ORDER_DESC) {
return (k1 < k2)? 1:-1;
} else {
return (k1 < k2)? -1:1;
}
}
static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
......@@ -451,6 +464,51 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
return 0;
}
int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) {
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t index = colIndex[i];
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index);
char* data = pColInfo->pData + rowIndex * pColInfo->info.bytes;
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
int32_t ret = tsCompareFunc(GET_INT64_VAL(data), GET_INT64_VAL(buffer[i]), order);
if (ret == 0) {
continue; // The timestamps are identical
} else {
return ret;
}
} else {
int32_t ret = columnValueAscendingComparator(data, buffer[i], pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
continue;
} else {
return ret;
}
}
// char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
// char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
// if (pDescriptor->pColumnModel->pFields[colIdx].field.type == TSDB_DATA_TYPE_TIMESTAMP) {
// int32_t ret = primaryKeyComparator(*(int64_t *)f1, *(int64_t *)f2, colIdx, pDescriptor->tsOrder);
// if (ret == 0) {
// continue;
// } else {
// return ret;
// }
// } else {
// SSchemaEx *pSchema = &pDescriptor->pColumnModel->pFields[colIdx];
// int32_t ret = columnValueAscendingComparator(f1, f2, pSchema->field.type, pSchema->field.bytes);
// if (ret == 0) {
// continue;
// } else {
// return ret;
// }
// }
}
return 0;
}
int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, char *data1, int32_t numOfRows2,
int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2);
......
......@@ -125,4 +125,37 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
return plan;
}
SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
SArray* plan = taosArrayInit(4, sizeof(int32_t));
if (!pQueryAttr->stableQuery) {
return plan;
}
// todo: exchange operator?
int32_t op = OP_MultiwaySort;
taosArrayPush(plan, &op);
// fill operator
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) {
op = OP_Fill;
taosArrayPush(plan, &op);
}
// arithmetic operator
if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
} else {
op = OP_GlobalAggregate;
taosArrayPush(plan, &op);
}
// limit/offset operator
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
op = OP_SLimit;
taosArrayPush(plan, &op);
}
return plan;
}
......@@ -172,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen);
code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册