未验证 提交 e58c53c6 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #14308 from taosdata/feature/TS-841

feat: added min_row & max_row functions
......@@ -603,7 +603,7 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) {
continue;
}
......@@ -625,7 +625,7 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) {
continue;
}
......
......@@ -2876,6 +2876,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_TWA:
case TSDB_FUNC_MIN:
case TSDB_FUNC_MAX:
case TSDB_FUNC_MIN_ROW:
case TSDB_FUNC_MAX_ROW:
case TSDB_FUNC_DIFF:
case TSDB_FUNC_DERIVATIVE:
case TSDB_FUNC_CSUM:
......@@ -2962,6 +2964,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int16_t resultType = 0;
......@@ -7768,10 +7772,20 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
bool isProjectionFunction = false;
bool minMaxRowExists = false;
const char* msg1 = "functions not compatible with interval";
// multi-output set/ todo refactor
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t k = 0; k < size; ++k) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
minMaxRowExists = true;
break;
}
}
for (int32_t k = 0; k < size; ++k) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
......@@ -7792,7 +7806,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
}
// projection query on primary timestamp, the selectivity function needs to be present.
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (minMaxRowExists || (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
bool hasSelectivity = false;
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pEx = tscExprGet(pQueryInfo, j);
......@@ -8296,13 +8310,14 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
pInfo->visible = false;
}
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
static void doUpdateSqlFunctionForColTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
//todo is 0??
//todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
bool minMaxRowExists = false;
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
......@@ -8312,6 +8327,20 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; // ts_select ts,top(col,2)
tagLength += pExpr->base.resBytes;
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
minMaxRowExists = true;
}
}
if (minMaxRowExists) {
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW || pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
continue;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ) {
pExpr->base.functionId = TSDB_FUNC_COL_DUMMY;
tagLength += pExpr->base.resBytes;
}
}
}
......@@ -8323,8 +8352,9 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
continue;
}
if ((pExpr->base.functionId != TSDB_FUNC_TAG_DUMMY && pExpr->base.functionId != TSDB_FUNC_TS_DUMMY) &&
!(pExpr->base.functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag))) {
if ((pExpr->base.functionId != TSDB_FUNC_TAG_DUMMY && pExpr->base.functionId != TSDB_FUNC_TS_DUMMY &&
pExpr->base.functionId != TSDB_FUNC_COL_DUMMY)
&& !(pExpr->base.functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->base.colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->base.functionId, (int32_t)pExpr->base.param[0].i64, &pExpr->base.resType,
&pExpr->base.resBytes, &pExpr->base.interBytes, tagLength, isSTable, NULL);
......@@ -8453,10 +8483,14 @@ static bool check_expr_in_groupby_colum(SGroupbyExpr* pGroupbyExpr, SExprInfo* p
* 2. if selectivity function and tagprj function both exist, there should be only
* one selectivity function exists.
*/
static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
static int32_t checkUpdateColTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg2 = "aggregation function should not be mixed up with projection";
const char* msg3 = "min_row should not be mixed up with max_row";
const char* msg4 = "only one selectivity function allowed in presence of min_row or max_row function";
bool minRowExists = false;
bool maxRowExists = false;
bool tagTsColExists = false;
int16_t numOfScalar = 0;
int16_t numOfSelectivity = 0;
......@@ -8473,9 +8507,17 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
tagTsColExists = true; // selectivity + ts/tag column
break;
}
} else if (pExpr->base.functionId == TSDB_FUNC_MIN_ROW) {
minRowExists = true;
} else if (pExpr->base.functionId == TSDB_FUNC_MAX_ROW) {
maxRowExists = true;
}
}
if (minRowExists && maxRowExists) {
return invalidOperationMsg(msg, msg3);
}
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
......@@ -8514,7 +8556,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
}
}
if (tagTsColExists) { // check if the selectivity function exists
if (tagTsColExists || minRowExists || maxRowExists) { // check if the selectivity function exists
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
......@@ -8525,7 +8567,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
* if numOfSelectivity equals to 0, it is a super table projection query
*/
if (numOfSelectivity == 1) {
doUpdateSqlFunctionForTagPrj(pQueryInfo);
doUpdateSqlFunctionForColTagPrj(pQueryInfo);
int32_t code = doUpdateSqlFunctionForColPrj(pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -8551,11 +8593,17 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
(functionId == TSDB_FUNC_LAST_DST && (pExpr->base.colInfo.flag & TSDB_COL_NULL) != 0)) {
// do nothing
} else {
return invalidOperationMsg(msg, msg1);
if (tagTsColExists) {
return invalidOperationMsg(msg, msg1);
}
if (minRowExists || maxRowExists) {
return invalidOperationMsg(msg, msg4);
}
}
}
doUpdateSqlFunctionForTagPrj(pQueryInfo);
doUpdateSqlFunctionForColTagPrj(pQueryInfo);
int32_t code = doUpdateSqlFunctionForColPrj(pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -8779,7 +8827,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
}
if (checkUpdateTagPrjFunctions(pQueryInfo, msg) != TSDB_CODE_SUCCESS) {
if (checkUpdateColTagPrjFunctions(pQueryInfo, msg) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -8794,7 +8842,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
return TSDB_CODE_SUCCESS;
} else {
return checkUpdateTagPrjFunctions(pQueryInfo, msg);
return checkUpdateColTagPrjFunctions(pQueryInfo, msg);
}
}
......
......@@ -90,8 +90,17 @@ extern "C" {
#define TSDB_FUNC_QSTOP 48
#define TSDB_FUNC_QDURATION 49
#define TSDB_FUNC_HYPERLOGLOG 50
#define TSDB_FUNC_MIN_ROW 51
#define TSDB_FUNC_MAX_ROW 52
#define TSDB_FUNC_COL_DUMMY 53
#define TSDB_FUNC_MAX_NUM 51
#define TSDB_FUNC_MAX_NUM 54
enum {
FUNC_NOT_VAL,
FUNC_MIN_ROW,
FUNC_MAX_ROW
};
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -217,6 +226,10 @@ typedef struct SQLFunctionCtx {
SHashObj **pModeSet; // for mode function
STimeWindow qWindow; // for _qstart/_qstop/_qduration column
int32_t allocRows; // rows allocated for output buffer
int16_t minRowIndex;
int16_t maxRowIndex;
int16_t minMaxRowType;
bool updateIndex; // whether update index after comparation
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......
......@@ -419,8 +419,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
assert(functionId != TSDB_FUNC_SCALAR_EXPR);
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
functionId == TSDB_FUNC_COL_DUMMY || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ ||
functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
{
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -522,6 +522,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SUM) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -680,6 +686,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = (int16_t)dataType;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
*type = (int16_t)dataType;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -1001,6 +1011,7 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
#define UPDATE_DATA(ctx, left, right, num, sign, k) \
do { \
if (((left) < (right)) ^ (sign)) { \
(ctx)->updateIndex = true; \
(left) = (right); \
DO_UPDATE_TAG_COLUMNS(ctx, k); \
(num) += 1; \
......@@ -1017,13 +1028,27 @@ int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
} while (0)
#define LOOPCHECK_N(val, list, ctx, tsdbType, sign, num) \
int32_t updateCount = 0; \
for (int32_t i = 0; i < ((ctx)->size); ++i) { \
if ((ctx)->hasNull && isNull((char *)&(list)[i], tsdbType)) { \
continue; \
} \
TSKEY key = (ctx)->ptsList != NULL? GET_TS_DATA(ctx, i):0; \
(ctx)->updateIndex = false; \
UPDATE_DATA(ctx, val, (list)[i], num, sign, key); \
}
if (!(ctx)->preAggVals.isSet) { \
if ((ctx)->updateIndex) { \
if (sign && (ctx)->preAggVals.statis.minIndex != i) { \
(ctx)->preAggVals.statis.minIndex = i; \
} \
if (!sign && (ctx)->preAggVals.statis.maxIndex != i) { \
(ctx)->preAggVals.statis.maxIndex = i; \
} \
updateCount++; \
} \
} \
} \
(ctx)->updateIndex = updateCount > 0 ? true : false; \
#define TYPED_LOOPCHECK_N(type, data, list, ctx, tsdbType, sign, notNullElems) \
do { \
......@@ -1406,6 +1431,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
#endif
if ((*data < val) ^ isMin) {
pCtx->updateIndex = true;
*data = (int32_t)val;
for (int32_t i = 0; i < (pCtx)->tagInfo.numOfTagCols; ++i) {
SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i];
......@@ -1465,13 +1491,17 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
int32_t *pData = p;
int32_t *retVal = (int32_t*) pOutput;
int32_t updateCount = 0;
for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*)&pData[i], pCtx->inputType)) {
continue;
}
pCtx->updateIndex = false;
if ((*retVal < pData[i]) ^ isMin) {
pCtx->updateIndex = true;
*retVal = pData[i];
if(tsList) {
TSKEY k = tsList[i];
......@@ -1479,7 +1509,21 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
}
}
*notNullElems += 1;
if (!pCtx->preAggVals.isSet) {
if (pCtx->updateIndex) {
if (isMin && pCtx->preAggVals.statis.minIndex != i) {
pCtx->preAggVals.statis.minIndex = i;
}
if (!isMin && pCtx->preAggVals.statis.maxIndex != i) {
pCtx->preAggVals.statis.maxIndex = i;
}
updateCount++;
}
}
}
pCtx->updateIndex = updateCount > 0 ? true : false;
#if defined(_DEBUG_VIEW)
qDebug("max value updated:%d", *retVal);
#endif
......@@ -1737,6 +1781,152 @@ static void max_func_merge(SQLFunctionCtx *pCtx) {
}
}
static bool min_row_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
GET_TRUE_DATA_TYPE();
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)pCtx->pOutput) = INT8_MAX;
break;
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *) pCtx->pOutput = UINT8_MAX;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)pCtx->pOutput) = INT16_MAX;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)pCtx->pOutput) = UINT16_MAX;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)pCtx->pOutput) = INT32_MAX;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)pCtx->pOutput) = UINT32_MAX;
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)pCtx->pOutput) = INT64_MAX;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)pCtx->pOutput) = UINT64_MAX;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)pCtx->pOutput) = FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)pCtx->pOutput), DBL_MAX);
break;
default:
qError("illegal data type:%d in min_row query", pCtx->inputType);
}
return true;
}
static bool max_row_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
if (!function_setup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
GET_TRUE_DATA_TYPE();
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)pCtx->pOutput) = INT8_MIN;
break;
case TSDB_DATA_TYPE_UTINYINT:
*((uint8_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)pCtx->pOutput) = INT16_MIN;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)pCtx->pOutput) = INT32_MIN;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)pCtx->pOutput) = INT64_MIN;
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)pCtx->pOutput) = 0;
break;
case TSDB_DATA_TYPE_FLOAT:
*((float *)pCtx->pOutput) = -FLT_MAX;
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(((double *)pCtx->pOutput), -DBL_MAX);
break;
default:
qError("illegal data type:%d in max_row query", pCtx->inputType);
}
return true;
}
static void min_row_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 1, &notNullElems);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query
if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
}
}
}
static void max_row_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
minMax_function(pCtx, pCtx->pOutput, 0, &notNullElems);
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
// set the flag for super table query
if (pCtx->stableQuery) {
*(pCtx->pOutput + pCtx->inputBytes) = DATA_SET_FLAG;
}
}
}
static void min_row_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1);
SET_VAL(pCtx, notNullElems, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void max_row_func_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0);
SET_VAL(pCtx, numOfElem, 1);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (numOfElem > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
#define LOOP_STDDEV_IMPL(type, r, d, ctx, delta, _type, num) \
for (int32_t i = 0; i < (ctx)->size; ++i) { \
if ((ctx)->hasNull && isNull((char *)&((type *)d)[i], (_type))) { \
......@@ -3412,6 +3602,70 @@ static void copy_function(SQLFunctionCtx *pCtx) {
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
static char *get_data_by_offset(char *src, int16_t inputType, int32_t inputBytes, int32_t offset) {
char *res = NULL;
switch (inputType) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
res = (char *) ((int8_t *) src + offset);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
res = (char *) ((int16_t *) src + offset);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
res = (char *) ((int32_t *) src + offset);
break;
case TSDB_DATA_TYPE_FLOAT:
res = (char *) ((float *) src + offset);
break;
case TSDB_DATA_TYPE_DOUBLE:
res = (char *) ((double *) src + offset);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
res = (char *) ((int64_t *) src + offset);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
res = src + offset * inputBytes;
break;
default: {
res = src;
}
}
return res;
}
static void row_copy_function(SQLFunctionCtx *pCtx) {
int16_t index;
if (pCtx->minMaxRowType == FUNC_NOT_VAL || !pCtx->updateIndex) {
return;
}
if (pCtx->minMaxRowType == FUNC_MIN_ROW) {
index = pCtx->minRowIndex;
} else {
index = pCtx->maxRowIndex;
}
if (index < 0) {
return;
}
SET_VAL(pCtx, pCtx->size, 1);
char *pData = GET_INPUT_DATA_LIST(pCtx);
pData = get_data_by_offset(pData, pCtx->inputType, pCtx->inputBytes, index);
assignVal(pCtx->pOutput, pData, pCtx->inputBytes, pCtx->inputType);
}
static void full_copy_function(SQLFunctionCtx *pCtx) {
copy_function(pCtx);
......@@ -6053,8 +6307,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, block_info, elapsed, histogram, unique, mode, tail
6, 8, -1, -1, -1, 7, 1, -1, -1, 1, -1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog
1, 1, 1, 1, 1, 1, 1, 1, 1,
// stateCount, stateDuration, wstart, wstop, wduration, qstart, qstop, qduration, hyperloglog, min_row, max_row
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6671,5 +6925,41 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
hll_func_finalizer,
hll_func_merge,
dataBlockRequired,
},
{
// 51
"min_row",
TSDB_FUNC_MIN_ROW,
TSDB_FUNC_MIN_ROW,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
min_row_func_setup,
min_row_function,
function_finalizer,
min_row_func_merge,
dataBlockRequired,
},
{
// 52
"max_row",
TSDB_FUNC_MAX_ROW,
TSDB_FUNC_MAX_ROW,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY,
max_row_func_setup,
max_row_function,
function_finalizer,
max_row_func_merge,
dataBlockRequired,
},
{
// 53
"col_dummy",
TSDB_FUNC_COL_DUMMY,
TSDB_FUNC_COL_DUMMY,
TSDB_BASE_FUNC_SO,
function_setup,
row_copy_function,
doFinalizer,
copy_function,
noDataRequired,
}
};
......@@ -413,7 +413,7 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY || functId == TSDB_FUNC_COL_DUMMY) {
hasTags = true;
continue;
}
......@@ -437,7 +437,7 @@ static bool isScalarWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY || functId == TSDB_FUNC_COL_DUMMY) {
hasTags = true;
continue;
}
......@@ -945,6 +945,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t minRowIndex = -1, maxRowIndex = -1;
bool updateIndex = false;
int32_t minMaxRowColIndex = -1;
int16_t minMaxRowType = FUNC_NOT_VAL;
for (int32_t k = 0; k < numOfOutput; ++k) {
bool hasAggregates = pCtx[k].preAggVals.isSet;
......@@ -977,7 +981,39 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
if (minMaxRowColIndex == -1) {
minMaxRowColIndex = k;
}
if (functionId == TSDB_FUNC_MIN_ROW) {
minMaxRowType = FUNC_MIN_ROW;
} else {
minMaxRowType = FUNC_MAX_ROW;
}
pCtx[k].updateIndex = false;
} else {
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
}
aAggs[functionId].xFunction(&pCtx[k]);
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
updateIndex = pCtx[k].updateIndex;
// find the minIndex or maxIndex of this column to detemine the index of other columns
if (functionId == TSDB_FUNC_MIN_ROW) {
minRowIndex = pCtx[k].preAggVals.statis.minIndex;
}
if (functionId == TSDB_FUNC_MAX_ROW) {
maxRowIndex = pCtx[k].preAggVals.statis.maxIndex;
}
}
} else {
assert(0);
}
......@@ -992,6 +1028,58 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
pCtx[k].preAggVals.isSet = hasAggregates;
pCtx[k].pInput = start;
}
// update the indices of columns before the one in min_row/max_row
if (updateIndex) {
for (int32_t k = 0; k < minMaxRowColIndex; ++k) {
bool hasAggregates = pCtx[k].preAggVals.isSet;
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
pCtx[k].endTs = pWin->ekey;
// keep it temporarialy
char* start = pCtx[k].pInput;
int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1);
if (pCtx[k].pInput != NULL) {
pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes;
}
if (tsCol != NULL) {
pCtx[k].ptsList = &tsCol[pos];
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_COL_DUMMY) {
continue;
}
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
aAggs[functionId].xFunction(&pCtx[k]);
pCtx[k].minRowIndex = -1;
pCtx[k].maxRowIndex = -1;
pCtx[k].updateIndex = false;
pCtx[k].minMaxRowType = FUNC_NOT_VAL;
}
// restore it
pCtx[k].preAggVals.isSet = hasAggregates;
pCtx[k].pInput = start;
}
}
}
......@@ -1233,6 +1321,10 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int16_t minRowIndex = -1, maxRowIndex = -1;
bool updateIndex = false;
int32_t minMaxRowColIndex = -1;
int16_t minMaxRowType = FUNC_NOT_VAL;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
......@@ -1243,7 +1335,39 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
if (minMaxRowColIndex == -1) {
minMaxRowColIndex = k;
}
if (functionId == TSDB_FUNC_MIN_ROW) {
minMaxRowType = FUNC_MIN_ROW;
} else {
minMaxRowType = FUNC_MAX_ROW;
}
pCtx[k].updateIndex = false;
} else {
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
}
aAggs[functionId].xFunction(&pCtx[k]);
if (functionId == TSDB_FUNC_MIN_ROW || functionId == TSDB_FUNC_MAX_ROW) {
updateIndex = pCtx[k].updateIndex;
// find the minIndex or maxIndex of this column to detemine the index of other columns
if (functionId == TSDB_FUNC_MIN_ROW) {
minRowIndex = pCtx[k].preAggVals.statis.minIndex;
}
if (functionId == TSDB_FUNC_MAX_ROW) {
maxRowIndex = pCtx[k].preAggVals.statis.maxIndex;
}
}
} else {
assert(0);
}
......@@ -1254,6 +1378,32 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
}
}
}
// update the indices of columns before the one in min_row/max_row
if (updateIndex) {
for (int32_t k = 0; k < minMaxRowColIndex; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
pCtx[k].startTs = startTs;
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_COL_DUMMY) {
continue;
}
pCtx[k].minRowIndex = minRowIndex;
pCtx[k].maxRowIndex = maxRowIndex;
pCtx[k].updateIndex = updateIndex;
pCtx[k].minMaxRowType = minMaxRowType;
aAggs[functionId].xFunction(&pCtx[k]);
pCtx[k].minRowIndex = -1;
pCtx[k].maxRowIndex = -1;
pCtx[k].updateIndex = false;
pCtx[k].minMaxRowType = FUNC_NOT_VAL;
}
}
}
}
static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
......@@ -1949,7 +2099,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { //ts_select ts,top(col,2)
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_COL_DUMMY) { //ts_select ts,top(col,2)
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
......@@ -2024,6 +2174,9 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
pCtx->minRowIndex = -1;
pCtx->maxRowIndex = -1;
pCtx->qWindow = pQueryAttr->window;
pCtx->allocRows = numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册