提交 fea38333 编写于 作者: X xywang

feat: enhanced min_row & max_row functions

上级 dfeb75fe
......@@ -603,7 +603,7 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) {
continue;
}
......@@ -625,7 +625,7 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) {
continue;
}
......
......@@ -7770,10 +7770,20 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
bool isProjectionFunction = false;
bool minMaxRowExists = false;
const char* msg1 = "functions not compatible with interval";
// multi-output set/ todo refactor
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t k = 0; k < size; ++k) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
minMaxRowExists = true;
break;
}
}
for (int32_t k = 0; k < size; ++k) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
......@@ -7794,7 +7804,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
}
// projection query on primary timestamp, the selectivity function needs to be present.
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (minMaxRowExists || (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
bool hasSelectivity = false;
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pEx = tscExprGet(pQueryInfo, j);
......@@ -8305,8 +8315,7 @@ static void doUpdateSqlFunctionForColTagPrj(SQueryInfo* pQueryInfo) {
//todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
bool isMinRow = false;
bool isMaxRow = false;
bool minMaxRowExists = false;
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
......@@ -8316,20 +8325,18 @@ static void doUpdateSqlFunctionForColTagPrj(SQueryInfo* pQueryInfo) {
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; // ts_select ts,top(col,2)
tagLength += pExpr->base.resBytes;
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW) {
isMinRow = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
isMaxRow = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
minMaxRowExists = true;
}
}
if (isMinRow || isMaxRow) {
if (minMaxRowExists) {
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
continue;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
pExpr->base.functionId = isMinRow ? TSDB_FUNC_MIN_COL_DUMMY : TSDB_FUNC_MAX_COL_DUMMY;
pExpr->base.functionId = TSDB_FUNC_COL_DUMMY;
tagLength += pExpr->base.resBytes;
}
}
......@@ -8344,7 +8351,7 @@ static void doUpdateSqlFunctionForColTagPrj(SQueryInfo* pQueryInfo) {
}
if ((pExpr->base.functionId != TSDB_FUNC_TAG_DUMMY && pExpr->base.functionId != TSDB_FUNC_TS_DUMMY &&
pExpr->base.functionId != TSDB_FUNC_MIN_COL_DUMMY && pExpr->base.functionId != TSDB_FUNC_MAX_COL_DUMMY)
pExpr->base.functionId != TSDB_FUNC_COL_DUMMY)
&& !(pExpr->base.functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->base.colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->base.functionId, (int32_t)pExpr->base.param[0].i64, &pExpr->base.resType,
......@@ -8478,10 +8485,10 @@ static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg)
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg2 = "aggregation function should not be mixed up with projection";
const char* msg3 = "min_row should not be mixed up with max_row";
const char* msg4 = "only one selectivity function allowed in presence of min_row or max_row function";
bool isMinRow = false;
bool isMaxRow = false;
bool isMinMaxRow = false;
bool minRowExists = false;
bool maxRowExists = false;
bool tagTsColExists = false;
int16_t numOfScalar = 0;
int16_t numOfSelectivity = 0;
......@@ -8499,26 +8506,14 @@ static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg)
break;
}
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW) {
isMinRow = true;
minRowExists = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
isMaxRow = true;
maxRowExists = true;
}
}
if (isMinRow && isMaxRow) {
if (minRowExists && maxRowExists) {
return invalidOperationMsg(msg, msg3);
} else if (isMinRow || isMaxRow) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo *pExpr = taosArrayGetP(pQueryInfo->exprList, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
continue;
} else {
if (false == check_expr_in_groupby_colum(pGroupbyExpr, pExpr)) {
isMinMaxRow = true;
break;
}
}
}
}
for (int32_t i = 0; i < numOfExprs; ++i) {
......@@ -8559,7 +8554,7 @@ static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg)
}
}
if (tagTsColExists || isMinMaxRow) { // check if the selectivity function exists
if (tagTsColExists || minRowExists || maxRowExists) { // check if the selectivity function exists
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
......@@ -8596,7 +8591,13 @@ static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg)
(functionId == TSDB_FUNC_LAST_DST && (pExpr->base.colInfo.flag & TSDB_COL_NULL) != 0)) {
// do nothing
} else {
return invalidOperationMsg(msg, msg1);
if (tagTsColExists) {
return invalidOperationMsg(msg, msg1);
}
if (minRowExists || maxRowExists) {
return invalidOperationMsg(msg, msg4);
}
}
}
......
......@@ -92,10 +92,15 @@ extern "C" {
#define TSDB_FUNC_HYPERLOGLOG 50
#define TSDB_FUNC_MIN_ROW 51
#define TSDB_FUNC_MAX_ROW 52
#define TSDB_FUNC_MIN_COL_DUMMY 53
#define TSDB_FUNC_MAX_COL_DUMMY 54
#define TSDB_FUNC_COL_DUMMY 53
#define TSDB_FUNC_MAX_NUM 55
#define TSDB_FUNC_MAX_NUM 54
enum {
FUNC_NOT_VAL,
FUNC_MIN_ROW,
FUNC_MAX_ROW
};
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -223,6 +228,7 @@ typedef struct SQLFunctionCtx {
int32_t allocRows; // rows allocated for output buffer
int16_t minRowIndex;
int16_t maxRowIndex;
int16_t minMaxRowType;
bool updateIndex; // whether update index after comparation
} SQLFunctionCtx;
......
......@@ -419,8 +419,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
assert(functionId != TSDB_FUNC_SCALAR_EXPR);
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_MIN_COL_DUMMY || functionId == TSDB_FUNC_MAX_COL_DUMMY || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
functionId == TSDB_FUNC_COL_DUMMY || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ ||
functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
{
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -1028,13 +1028,27 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
} while (0)
#define LOOPCHECK_N(val, list, ctx, tsdbType, sign, num) \
int32_t updateCount = 0; \
for (int32_t i = 0; i < ((ctx)->size); ++i) { \
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \
} \
TSKEY key = (ctx)->ptsList != NULL? GET_TS_DATA(ctx, i):0; \
(ctx)->updateIndex = false; \
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
}
if (!(ctx)->preAggVals.isSet) { \
if ((ctx)->updateIndex) { \
if (sign && (ctx)->preAggVals.statis.minIndex != i) { \
(ctx)->preAggVals.statis.minIndex = i; \
} \
if (!sign && (ctx)->preAggVals.statis.maxIndex != i) { \
(ctx)->preAggVals.statis.maxIndex = i; \
} \
updateCount++; \
} \
} \
} \
(ctx)->updateIndex = updateCount > 0 ? true : false; \
#define TYPED_LOOPCHECK_N(type, data, list, ctx, tsdbType, sign, notNullElems) \
do { \
......@@ -3608,22 +3622,20 @@ static char *get_data_by_offset(char *src, int16_t inputType, int32_t inputBytes
return res;
}
static void min_row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index = pCtx->minRowIndex;
if (index < 0 || !pCtx->updateIndex) {
static void row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index;
if (pCtx->minMaxRowType == FUNC_NOT_VAL || !pCtx->updateIndex) {
return;
}
SET_VAL(pCtx, pCtx->size, 1);
char *pData = GET_INPUT_DATA_LIST(pCtx);
pData = get_data_by_offset(pData, pCtx->inputType, pCtx->inputBytes, index);
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
if (pCtx->minMaxRowType == FUNC_MIN_ROW) {
index = pCtx->minRowIndex;
} else {
index = pCtx->maxRowIndex;
}
static void max_row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index = pCtx->maxRowIndex;
if (index < 0 || !pCtx->updateIndex) {
if (index < 0) {
return;
}
......@@ -6920,24 +6932,12 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
},
{
// 53
"min_col_dummy",
TSDB_FUNC_MIN_COL_DUMMY,
TSDB_FUNC_MIN_COL_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
min_row_copy_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 54
"max_col_dummy",
TSDB_FUNC_MAX_COL_DUMMY,
TSDB_FUNC_MAX_COL_DUMMY,
"col_dummy",
TSDB_FUNC_COL_DUMMY,
TSDB_FUNC_COL_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
max_row_copy_function,
row_copy_function,
doFinalizer,
copy_function,
noDataRequired,
......
......@@ -413,9 +413,7 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY ||
functId == TSDB_FUNC_MIN_COL_DUMMY || functId == TSDB_FUNC_MAX_COL_DUMMY)
{
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY || functId == TSDB_FUNC_COL_DUMMY) {
hasTags = true;
continue;
}
......@@ -439,9 +437,7 @@ static bool isScalarWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY ||
functId == TSDB_FUNC_MIN_COL_DUMMY || functId == TSDB_FUNC_MAX_COL_DUMMY)
{
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY || functId == TSDB_FUNC_COL_DUMMY) {
hasTags = true;
continue;
}
......@@ -949,6 +945,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t minRowIndex = -1, maxRowIndex = -1;
bool updateIndex = false;
int32_t minMaxRowColIndex = -1;
int16_t minMaxRowType = FUNC_NOT_VAL;
for (int32_t k = 0; k < numOfOutput; ++k) {
bool hasAggregates = pCtx[k].preAggVals.isSet;
......@@ -981,7 +981,39 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
if (minMaxRowColIndex == -1) {
minMaxRowColIndex = k;
}
if (functionId == TSDB_FUNC_MIN_ROW) {
minMaxRowType = FUNC_MIN_ROW;
} else {
minMaxRowType = FUNC_MAX_ROW;
}
pCtx[k].updateIndex = false;
} else {
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
}
aAggs[functionId].xFunction(&pCtx[k]);
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
updateIndex = pCtx[k].updateIndex;
// find the minIndex or maxIndex of this column to detemine the index of other columns
if (functionId == TSDB_FUNC_MIN_ROW) {
minRowIndex = pCtx[k].preAggVals.statis.minIndex;
}
if (functionId == TSDB_FUNC_MAX_ROW) {
maxRowIndex = pCtx[k].preAggVals.statis.maxIndex;
}
}
} else {
assert(0);
}
......@@ -996,6 +1028,58 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
pCtx[k].preAggVals.isSet = hasAggregates;
pCtx[k].pInput = start;
}
// update the indices of columns before the one in min_row/max_row
if (updateIndex) {
for (int32_t k = 0; k < minMaxRowColIndex; ++k) {
bool hasAggregates = pCtx[k].preAggVals.isSet;
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
pCtx[k].endTs = pWin->ekey;
// keep it temporarialy
char* start = pCtx[k].pInput;
int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1);
if (pCtx[k].pInput != NULL) {
pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes;
}
if (tsCol != NULL) {
pCtx[k].ptsList = &tsCol[pos];
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_COL_DUMMY) {
continue;
}
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
aAggs[functionId].xFunction(&pCtx[k]);
pCtx[k].minRowIndex = -1;
pCtx[k].maxRowIndex = -1;
pCtx[k].updateIndex = false;
pCtx[k].minMaxRowType = FUNC_NOT_VAL;
}
// restore it
pCtx[k].preAggVals.isSet = hasAggregates;
pCtx[k].pInput = start;
}
}
}
......@@ -1238,8 +1322,9 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int16_t minRowIndex = -1, maxRowIndex = -1;
bool updateIndex = false;
bool updateIndex = false;
int32_t minMaxRowColIndex = -1;
int16_t minMaxRowType = FUNC_NOT_VAL;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
......@@ -1255,11 +1340,18 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
minMaxRowColIndex = k;
}
if (functionId == TSDB_FUNC_MIN_ROW) {
minMaxRowType = FUNC_MIN_ROW;
} else {
minMaxRowType = FUNC_MAX_ROW;
}
pCtx[k].updateIndex = false;
} else {
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
}
aAggs[functionId].xFunction(&pCtx[k]);
......@@ -1294,19 +1386,21 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
pCtx[k].startTs = startTs;
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_MIN_COL_DUMMY && functionId != TSDB_FUNC_MAX_COL_DUMMY) {
if (functionId != TSDB_FUNC_COL_DUMMY) {
continue;
}
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
aAggs[functionId].xFunction(&pCtx[k]);
pCtx[k].minRowIndex = -1;
pCtx[k].maxRowIndex = -1;
pCtx[k].updateIndex = false;
pCtx[k].minMaxRowType = FUNC_NOT_VAL;
}
}
}
......@@ -2005,9 +2099,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY ||
functionId == TSDB_FUNC_MIN_COL_DUMMY || functionId == TSDB_FUNC_MAX_COL_DUMMY)
{ //ts_select ts,top(col,2)
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) { //ts_select ts,top(col,2)
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册