提交 dfeb75fe 编写于 作者: X xywang

feat: added min_row & max_row functions

上级 08810f73
......@@ -2875,6 +2875,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_TWA:
case TSDB_FUNC_MIN:
case TSDB_FUNC_MAX:
case TSDB_FUNC_MIN_ROW:
case TSDB_FUNC_MAX_ROW:
case TSDB_FUNC_DIFF:
case TSDB_FUNC_DERIVATIVE:
case TSDB_FUNC_CSUM:
......@@ -2961,6 +2963,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int16_t resultType = 0;
......@@ -8294,13 +8298,15 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
pInfo->visible = false;
}
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
static void doUpdateSqlFunctionForColTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
//todo is 0??
//todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
bool isMinRow = false;
bool isMaxRow = false;
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
......@@ -8310,6 +8316,22 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; // ts_select ts,top(col,2)
tagLength += pExpr->base.resBytes;
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW) {
isMinRow = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
isMaxRow = true;
}
}
if (isMinRow || isMaxRow) {
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
continue;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
pExpr->base.functionId = isMinRow ? TSDB_FUNC_MIN_COL_DUMMY : TSDB_FUNC_MAX_COL_DUMMY;
tagLength += pExpr->base.resBytes;
}
}
}
......@@ -8321,8 +8343,9 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
continue;
}
if ((pExpr->base.functionId != TSDB_FUNC_TAG_DUMMY && pExpr->base.functionId != TSDB_FUNC_TS_DUMMY) &&
!(pExpr->base.functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag))) {
if ((pExpr->base.functionId != TSDB_FUNC_TAG_DUMMY && pExpr->base.functionId != TSDB_FUNC_TS_DUMMY &&
pExpr->base.functionId != TSDB_FUNC_MIN_COL_DUMMY && pExpr->base.functionId != TSDB_FUNC_MAX_COL_DUMMY)
&& !(pExpr->base.functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->base.colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->base.functionId, (int32_t)pExpr->base.param[0].i64, &pExpr->base.resType,
&pExpr->base.resBytes, &pExpr->base.interBytes, tagLength, isSTable, NULL);
......@@ -8451,10 +8474,14 @@ static bool check_expr_in_groupby_colum(SGroupbyExpr* pGroupbyExpr, SExprInfo* p
* 2. if selectivity function and tagprj function both exist, there should be only
* one selectivity function exists.
*/
static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg2 = "aggregation function should not be mixed up with projection";
const char* msg3 = "min_row should not be mixed up with max_row";
bool isMinRow = false;
bool isMaxRow = false;
bool isMinMaxRow = false;
bool tagTsColExists = false;
int16_t numOfScalar = 0;
int16_t numOfSelectivity = 0;
......@@ -8471,6 +8498,26 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
tagTsColExists = true; // selectivity + ts/tag column
break;
}
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW) {
isMinRow = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
isMaxRow = true;
}
}
if (isMinRow && isMaxRow) {
return invalidOperationMsg(msg, msg3);
} else if (isMinRow || isMaxRow) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo *pExpr = taosArrayGetP(pQueryInfo->exprList, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
continue;
} else {
if (false == check_expr_in_groupby_colum(pGroupbyExpr, pExpr)) {
isMinMaxRow = true;
break;
}
}
}
}
......@@ -8512,7 +8559,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
}
}
if (tagTsColExists) { // check if the selectivity function exists
if (tagTsColExists || isMinMaxRow) { // check if the selectivity function exists
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
......@@ -8523,7 +8570,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
* if numOfSelectivity equals to 0, it is a super table projection query
*/
if (numOfSelectivity == 1) {
doUpdateSqlFunctionForTagPrj(pQueryInfo);
doUpdateSqlFunctionForColTagPrj(pQueryInfo);
int32_t code = doUpdateSqlFunctionForColPrj(pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -8553,7 +8600,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
}
}
doUpdateSqlFunctionForTagPrj(pQueryInfo);
doUpdateSqlFunctionForColTagPrj(pQueryInfo);
int32_t code = doUpdateSqlFunctionForColPrj(pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -8777,7 +8824,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
}
if (checkUpdateTagPrjFunctions(pQueryInfo, msg) != TSDB_CODE_SUCCESS) {
if (checkUpdateColTagPrjFunctions(pQueryInfo, msg) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -8792,7 +8839,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
return TSDB_CODE_SUCCESS;
} else {
return checkUpdateTagPrjFunctions(pQueryInfo, msg);
return checkUpdateColTagPrjFunctions(pQueryInfo, msg);
}
}
......
......@@ -90,8 +90,12 @@ extern "C" {
#define TSDB_FUNC_QSTOP 48
#define TSDB_FUNC_QDURATION 49
#define TSDB_FUNC_HYPERLOGLOG 50
#define TSDB_FUNC_MIN_ROW 51
#define TSDB_FUNC_MAX_ROW 52
#define TSDB_FUNC_MIN_COL_DUMMY 53
#define TSDB_FUNC_MAX_COL_DUMMY 54
#define TSDB_FUNC_MAX_NUM 51
#define TSDB_FUNC_MAX_NUM 55
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -217,6 +221,9 @@ typedef struct SQLFunctionCtx {
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
int32_t allocRows; // rows allocated for output buffer
int16_t minRowIndex;
int16_t maxRowIndex;
bool updateIndex; // whether update index after comparation
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......
......@@ -419,8 +419,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
assert(functionId != TSDB_FUNC_SCALAR_EXPR);
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
functionId == TSDB_FUNC_MIN_COL_DUMMY || functionId == TSDB_FUNC_MAX_COL_DUMMY || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
{
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -522,6 +522,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SUM) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -680,6 +686,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = (int16_t)dataType;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
*type = (int16_t)dataType;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -1001,6 +1011,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
#define UPDATE_DATA(ctx, left, right, num, sign, k) \
do { \
if (((left) < (right)) ^ (sign)) { \
(ctx)->updateIndex = true; \
(left) = (right); \
DO_UPDATE_TAG_COLUMNS(ctx, k); \
(num) += 1; \
......@@ -1737,6 +1748,152 @@ static void max_func_merge(SQLFunctionCtx *pCtx) {
}
}
static bool min_row_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
GET_TRUE_DATA_TYPE();
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)pCtx->pOutput) = INT8_MAX;
break;
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *) pCtx->pOutput = UINT8_MAX;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)pCtx->pOutput) = INT16_MAX;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)pCtx->pOutput) = UINT16_MAX;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)pCtx->pOutput) = INT32_MAX;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)pCtx->pOutput) = UINT32_MAX;
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)pCtx->pOutput) = INT64_MAX;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)pCtx->pOutput) = UINT64_MAX;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)pCtx->pOutput) = FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)pCtx->pOutput), DBL_MAX);
break;
default:
qError("illegal data type:%d in min_row query", pCtx->inputType);
}
return true;
}
static bool max_row_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
GET_TRUE_DATA_TYPE();
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)pCtx->pOutput) = INT8_MIN;
break;
case TSDB_DATA_TYPE_UTINYINT:
*((uint8_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)pCtx->pOutput) = INT16_MIN;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)pCtx->pOutput) = INT32_MIN;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)pCtx->pOutput) = INT64_MIN;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)pCtx->pOutput) = -FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)pCtx->pOutput), -DBL_MAX);
break;
default:
qError("illegal data type:%d in max_row query", pCtx->inputType);
}
return true;
}
static void min_row_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 1, &notNullElems);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query
if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
}
}
}
static void max_row_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 0, &notNullElems);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query
if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
}
}
}
static void min_row_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1);
SET_VAL(pCtx, notNullElems, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void max_row_func_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0);
SET_VAL(pCtx, numOfElem, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (numOfElem > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
#define LOOP_STDDEV_IMPL(type, r, d, ctx, delta, _type, num) \
for (int32_t i = 0; i < (ctx)->size; ++i) { \
if ((ctx)->hasNull && isNull((char *)&((type *)d)[i], (_type))) { \
......@@ -3411,6 +3568,72 @@ static void copy_function(SQLFunctionCtx *pCtx) {
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
static char *get_data_by_offset(char *src, int16_t inputType, int32_t inputBytes, int32_t offset) {
char *res = NULL;
switch (inputType) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
res = (char *) ((int8_t *) src + offset);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
res = (char *) ((int16_t *) src + offset);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
res = (char *) ((int32_t *) src + offset);
break;
case TSDB_DATA_TYPE_FLOAT:
res = (char *) ((float *) src + offset);
break;
case TSDB_DATA_TYPE_DOUBLE:
res = (char *) ((double *) src + offset);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
res = (char *) ((int64_t *) src + offset);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
res = src + offset * inputBytes;
break;
default: {
res = src;
}
}
return res;
}
static void min_row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index = pCtx->minRowIndex;
if (index < 0 || !pCtx->updateIndex) {
return;
}
SET_VAL(pCtx, pCtx->size, 1);
char *pData = GET_INPUT_DATA_LIST(pCtx);
pData = get_data_by_offset(pData, pCtx->inputType, pCtx->inputBytes, index);
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
static void max_row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index = pCtx->maxRowIndex;
if (index < 0 || !pCtx->updateIndex) {
return;
}
SET_VAL(pCtx, pCtx->size, 1);
char *pData = GET_INPUT_DATA_LIST(pCtx);
pData = get_data_by_offset(pData, pCtx->inputType, pCtx->inputBytes, index);
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
static void full_copy_function(SQLFunctionCtx *pCtx) {
copy_function(pCtx);
......@@ -6052,8 +6275,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog
1, 1, 1, 1, 1, 1, 1, 1, 1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog, min_row, max_row
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6670,5 +6893,53 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
hll_func_finalizer,
hll_func_merge,
dataBlockRequired,
},
{
// 51
"min_row",
TSDB_FUNC_MIN_ROW,
TSDB_FUNC_MIN_ROW,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
min_row_func_setup,
min_row_function,
function_finalizer,
min_row_func_merge,
dataBlockRequired,
},
{
// 52
"max_row",
TSDB_FUNC_MAX_ROW,
TSDB_FUNC_MAX_ROW,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
max_row_func_setup,
max_row_function,
function_finalizer,
max_row_func_merge,
dataBlockRequired,
},
{
// 53
"min_col_dummy",
TSDB_FUNC_MIN_COL_DUMMY,
TSDB_FUNC_MIN_COL_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
min_row_copy_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 54
"max_col_dummy",
TSDB_FUNC_MAX_COL_DUMMY,
TSDB_FUNC_MAX_COL_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
max_row_copy_function,
doFinalizer,
copy_function,
noDataRequired,
}
};
......@@ -413,7 +413,9 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY ||
functId == TSDB_FUNC_MIN_COL_DUMMY || functId == TSDB_FUNC_MAX_COL_DUMMY)
{
hasTags = true;
continue;
}
......@@ -437,7 +439,9 @@ static bool isScalarWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY ||
functId == TSDB_FUNC_MIN_COL_DUMMY || functId == TSDB_FUNC_MAX_COL_DUMMY)
{
hasTags = true;
continue;
}
......@@ -1233,6 +1237,9 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int16_t minRowIndex = -1, maxRowIndex = -1;
bool updateIndex = false;
int32_t minMaxRowColIndex = -1;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
......@@ -1243,7 +1250,32 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
if (minMaxRowColIndex == -1) {
minMaxRowColIndex = k;
}
pCtx[k].updateIndex = false;
} else {
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
}
aAggs[functionId].xFunction(&pCtx[k]);
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
updateIndex = pCtx[k].updateIndex;
// find the minIndex or maxIndex of this column to detemine the index of other columns
if (functionId == TSDB_FUNC_MIN_ROW) {
minRowIndex = pCtx[k].preAggVals.statis.minIndex;
}
if (functionId == TSDB_FUNC_MAX_ROW) {
maxRowIndex = pCtx[k].preAggVals.statis.maxIndex;
}
}
} else {
assert(0);
}
......@@ -1254,6 +1286,30 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
}
}
}
// update the indices of columns before the one in min_row/max_row
if (updateIndex) {
for (int32_t k = 0; k < minMaxRowColIndex; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
pCtx[k].startTs = startTs;
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_MIN_COL_DUMMY && functionId != TSDB_FUNC_MAX_COL_DUMMY) {
continue;
}
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
aAggs[functionId].xFunction(&pCtx[k]);
pCtx[k].minRowIndex = -1;
pCtx[k].maxRowIndex = -1;
pCtx[k].updateIndex = false;
}
}
}
}
static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
......@@ -1949,7 +2005,9 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { //ts_select ts,top(col,2)
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY ||
functionId == TSDB_FUNC_MIN_COL_DUMMY || functionId == TSDB_FUNC_MAX_COL_DUMMY)
{ //ts_select ts,top(col,2)
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
......@@ -2024,6 +2082,9 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
pCtx->minRowIndex = -1;
pCtx->maxRowIndex = -1;
pCtx->qWindow = pQueryAttr->window;
pCtx->allocRows = numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册