未验证 提交 e584465d 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10351 from taosdata/feature/TD-10987

Feature/TD-10987 add mode function
...@@ -231,15 +231,15 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) ...@@ -231,15 +231,15 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes)
int32_t tscGetResRowLength(SArray* pExprList); int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int32_t interSize, bool isTagCol);
SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType); int16_t size, int16_t resColId, int32_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes); void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int32_t interSize, bool isTagCol);
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int32_t size); int32_t size);
......
...@@ -615,11 +615,9 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_ ...@@ -615,11 +615,9 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
if (functionId == TSDB_FUNC_UNIQUE && if (GET_RES_INFO(&(pCtx[j]))->numOfRes == -1){
(GET_RES_INFO(&(pCtx[j]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[j]))->numOfRes == -1)){ tscError("result num is too large.");
tscError("Unique result num is too large. num: %d, limit: %d", longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE);
GET_RES_INFO(&(pCtx[j]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
} }
} }
} }
......
...@@ -2693,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2693,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg26 = "start param cannot be 0 with 'log_bin'"; const char* msg26 = "start param cannot be 0 with 'log_bin'";
const char* msg27 = "factor param cannot be negative or equal to 0/1"; const char* msg27 = "factor param cannot be negative or equal to 0/1";
const char* msg28 = "the second paramter of diff should be 0 or 1"; const char* msg28 = "the second paramter of diff should be 0 or 1";
const char* msg29 = "key timestamp column cannot be used to unique function"; const char* msg29 = "key timestamp column cannot be used to unique/mode function";
switch (functionId) { switch (functionId) {
case TSDB_FUNC_COUNT: { case TSDB_FUNC_COUNT: {
...@@ -2791,7 +2791,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2791,7 +2791,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_CSUM: case TSDB_FUNC_CSUM:
case TSDB_FUNC_STDDEV: case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: case TSDB_FUNC_LEASTSQR:
case TSDB_FUNC_ELAPSED: { case TSDB_FUNC_ELAPSED:
case TSDB_FUNC_MODE: {
// 1. valid the number of parameters // 1. valid the number of parameters
int32_t numOfParams = int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList); (pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
...@@ -2852,7 +2853,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2852,7 +2853,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type // 2. check if sql function can be applied on this column data type
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
...@@ -4010,7 +4013,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -4010,7 +4013,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_SAMPLE) ||
(functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_ELAPSED) ||
(functionId == TSDB_FUNC_HISTOGRAM) || (functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE)) { (functionId == TSDB_FUNC_UNIQUE) ||
(functionId == TSDB_FUNC_MODE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
......
...@@ -2535,7 +2535,7 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArr ...@@ -2535,7 +2535,7 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArr
SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { int16_t size, int16_t resColId, int32_t interSize, int32_t colType) {
SExprInfo* pExpr = calloc(1, sizeof(SExprInfo)); SExprInfo* pExpr = calloc(1, sizeof(SExprInfo));
if (pExpr == NULL) { if (pExpr == NULL) {
return NULL; return NULL;
...@@ -2592,7 +2592,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo ...@@ -2592,7 +2592,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo
} }
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int32_t interSize, bool isTagCol) {
int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList); int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList);
if (index == num) { if (index == num) {
return tscExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); return tscExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
...@@ -2605,7 +2605,7 @@ SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t function ...@@ -2605,7 +2605,7 @@ SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t function
} }
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int32_t interSize, bool isTagCol) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SExprInfo* pExpr = tscExprCreate(pTableMetaInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); SExprInfo* pExpr = tscExprCreate(pTableMetaInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprList, &pExpr); taosArrayPush(pQueryInfo->exprList, &pExpr);
...@@ -4937,7 +4937,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -4937,7 +4937,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
pse->colInfo.colIndex = i; pse->colInfo.colIndex = i;
pse->colType = pExpr->base.resType; pse->colType = pExpr->base.resType;
if(pExpr->base.resBytes > INT16_MAX && pExpr->base.functionId == TSDB_FUNC_UNIQUE){ if(pExpr->base.resBytes > INT16_MAX &&
(pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE)){
pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; pQueryAttr->interBytesForGlobal = pExpr->base.resBytes;
}else{ }else{
pse->colBytes = pExpr->base.resBytes; pse->colBytes = pExpr->base.resBytes;
......
...@@ -293,7 +293,7 @@ int32_t* taosGetErrno(); ...@@ -293,7 +293,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") #define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"unique result num is too large") #define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large")
// grant // grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
...@@ -79,8 +79,9 @@ extern "C" { ...@@ -79,8 +79,9 @@ extern "C" {
#define TSDB_FUNC_ELAPSED 37 #define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_HISTOGRAM 38 #define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_UNIQUE 39 #define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40
#define TSDB_FUNC_MAX_NUM 40 #define TSDB_FUNC_MAX_NUM 41
#define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
...@@ -148,7 +149,7 @@ typedef struct SResultRowCellInfo { ...@@ -148,7 +149,7 @@ typedef struct SResultRowCellInfo {
int8_t hasResult; // result generated, not NULL value int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized bool initialized; // output buffer has been initialized
bool complete; // query has completed bool complete; // query has completed
uint32_t numOfRes; // num of output result in current buffer int32_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo; } SResultRowCellInfo;
typedef struct SPoint1 { typedef struct SPoint1 {
...@@ -203,6 +204,7 @@ typedef struct SQLFunctionCtx { ...@@ -203,6 +204,7 @@ typedef struct SQLFunctionCtx {
SPoint1 end; SPoint1 end;
SHashObj **pUniqueSet; // for unique function SHashObj **pUniqueSet; // for unique function
SHashObj **pModeSet; // for mode function
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SAggFunctionInfo { typedef struct SAggFunctionInfo {
......
...@@ -91,6 +91,7 @@ typedef struct SResultRow { ...@@ -91,6 +91,7 @@ typedef struct SResultRow {
STimeWindow win; STimeWindow win;
char *key; // start key of current result row char *key; // start key of current result row
SHashObj *uniqueHash; // for unique function SHashObj *uniqueHash; // for unique function
SHashObj *modeHash; // for unique function
} SResultRow; } SResultRow;
typedef struct SResultRowCell { typedef struct SResultRowCell {
......
...@@ -80,6 +80,8 @@ typedef struct SDiskbasedResultBuf { ...@@ -80,6 +80,8 @@ typedef struct SDiskbasedResultBuf {
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define MAX_UNIQUE_RESULT_ROWS (1000) #define MAX_UNIQUE_RESULT_ROWS (1000)
#define MAX_UNIQUE_RESULT_SIZE (1024*1024*1) #define MAX_UNIQUE_RESULT_SIZE (1024*1024*1)
#define MAX_MODE_INNER_RESULT_ROWS (1000000)
#define MAX_MODE_INNER_RESULT_SIZE (1024*1024*10)
/** /**
* create disk-based result buffer * create disk-based result buffer
* @param pResultBuf * @param pResultBuf
......
...@@ -233,6 +233,16 @@ typedef struct { ...@@ -233,6 +233,16 @@ typedef struct {
char res[]; char res[];
} SUniqueFuncInfo; } SUniqueFuncInfo;
typedef struct {
int64_t count;
char data[];
} ModeUnit;
typedef struct {
int32_t num;
char res[];
} SModeFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) { if (!isValidDataType(dataType)) {
...@@ -369,10 +379,22 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -369,10 +379,22 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; int64_t size = sizeof(UniqueUnit) + dataBytes + extLength;
size *= param; size *= param;
size += sizeof(SUniqueFuncInfo); size += sizeof(SUniqueFuncInfo);
if (size > MAX_UNIQUE_RESULT_SIZE){ if (size > MAX_UNIQUE_RESULT_SIZE) {
size = MAX_UNIQUE_RESULT_SIZE; size = MAX_UNIQUE_RESULT_SIZE;
} }
*bytes = size; *bytes = (int32_t)size;
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_MODE) {
*type = TSDB_DATA_TYPE_BINARY;
int64_t size = sizeof(ModeUnit) + dataBytes;
size *= MAX_MODE_INNER_RESULT_ROWS;
size += sizeof(SModeFuncInfo);
if (size > MAX_MODE_INNER_RESULT_SIZE){
size = MAX_MODE_INNER_RESULT_SIZE;
}
*bytes = (int32_t)size;
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -513,7 +535,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -513,7 +535,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size = MAX_UNIQUE_RESULT_SIZE; size = MAX_UNIQUE_RESULT_SIZE;
} }
*interBytes = (int32_t)size; *interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) { } else if(functionId == TSDB_FUNC_MODE) {
*type = (int16_t)dataType;
*bytes = dataBytes;
int64_t size = sizeof(ModeUnit) + dataBytes;
size *= MAX_MODE_INNER_RESULT_ROWS;
size += sizeof(SModeFuncInfo);
if (size > MAX_MODE_INNER_RESULT_SIZE){
size = MAX_MODE_INNER_RESULT_SIZE;
}
*interBytes = (int32_t)size;
return TSDB_CODE_SUCCESS;
}else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType; *type = (int16_t)dataType;
*bytes = dataBytes; *bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param; size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
...@@ -2245,20 +2278,12 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { ...@@ -2245,20 +2278,12 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
tfree(pData); tfree(pData);
} }
/* static void *getOutputInfo(SQLFunctionCtx *pCtx) {
* Parameters values:
* 1. param[0]: maximum allowable results
* 2. param[1]: order by type (time or value)
* 3. param[2]: asc/desc order
*
* top/bottom use the intermediate result buffer to keep the intermediate result
*/
static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer // only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (STopBotInfo*) pCtx->pOutput; return pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo); return GET_ROWCELL_INTERBUF(pResInfo);
} }
...@@ -2291,7 +2316,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha ...@@ -2291,7 +2316,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
return true; return true;
} }
STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx); STopBotInfo *pTopBotInfo = getOutputInfo(pCtx);
// required number of results are not reached, continue load data block // required number of results are not reached, continue load data block
if (pTopBotInfo->num < pCtx->param[0].i64) { if (pTopBotInfo->num < pCtx->param[0].i64) {
...@@ -2346,7 +2371,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -2346,7 +2371,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
return false; return false;
} }
STopBotInfo *pInfo = getTopBotOutputInfo(pCtx); STopBotInfo *pInfo = getOutputInfo(pCtx);
buildTopBotStruct(pInfo, pCtx); buildTopBotStruct(pInfo, pCtx);
return true; return true;
} }
...@@ -2354,7 +2379,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -2354,7 +2379,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
static void top_function(SQLFunctionCtx *pCtx) { static void top_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
STopBotInfo *pRes = getTopBotOutputInfo(pCtx); STopBotInfo *pRes = getOutputInfo(pCtx);
assert(pRes->num >= 0); assert(pRes->num >= 0);
if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) {
...@@ -2393,7 +2418,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2393,7 +2418,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
// construct the input data struct from binary data // construct the input data struct from binary data
buildTopBotStruct(pInput, pCtx); buildTopBotStruct(pInput, pCtx);
STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); STopBotInfo *pOutput = getOutputInfo(pCtx);
// the intermediate result is binary, we only use the output data type // the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
...@@ -2413,7 +2438,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2413,7 +2438,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
static void bottom_function(SQLFunctionCtx *pCtx) { static void bottom_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
STopBotInfo *pRes = getTopBotOutputInfo(pCtx); STopBotInfo *pRes = getOutputInfo(pCtx);
if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) {
buildTopBotStruct(pRes, pCtx); buildTopBotStruct(pRes, pCtx);
...@@ -2450,7 +2475,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2450,7 +2475,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
// construct the input data struct from binary data // construct the input data struct from binary data
buildTopBotStruct(pInput, pCtx); buildTopBotStruct(pInput, pCtx);
STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); STopBotInfo *pOutput = getOutputInfo(pCtx);
// the intermediate result is binary, we only use the output data type // the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
...@@ -2619,18 +2644,6 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) { ...@@ -2619,18 +2644,6 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo)); pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
} }
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
pInfo = (SAPercentileInfo*) pCtx->pOutput;
} else {
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
return pInfo;
}
// //
// ----------------- tdigest ------------------- // ----------------- tdigest -------------------
// //
...@@ -2642,7 +2655,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) ...@@ -2642,7 +2655,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo)
} }
// new TDigest // new TDigest
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getOutputInfo(pCtx);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION); pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
return true; return true;
...@@ -2652,7 +2665,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { ...@@ -2652,7 +2665,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx); SAPercentileInfo * pAPerc = getOutputInfo(pCtx);
assert(pAPerc->pTDigest != NULL); assert(pAPerc->pTDigest != NULL);
if(pAPerc->pTDigest == NULL) { if(pAPerc->pTDigest == NULL) {
...@@ -2694,7 +2707,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) { ...@@ -2694,7 +2707,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
return ; return ;
} }
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getOutputInfo(pCtx);
if(pOutput->pTDigest->num_centroids == 0) { if(pOutput->pTDigest->num_centroids == 0) {
memcpy(pOutput->pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); memcpy(pOutput->pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pOutput->pTDigest, COMPRESSION); tdigestAutoFill(pOutput->pTDigest, COMPRESSION);
...@@ -2711,7 +2724,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2711,7 +2724,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey; double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo * pAPerc = getAPerctInfo(pCtx); SAPercentileInfo * pAPerc = getOutputInfo(pCtx);
if (pCtx->currentStage == MERGE_STAGE) { if (pCtx->currentStage == MERGE_STAGE) {
if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
...@@ -2755,7 +2768,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -2755,7 +2768,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
return false; return false;
} }
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getOutputInfo(pCtx);
buildHistogramInfo(pInfo); buildHistogramInfo(pInfo);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
...@@ -2772,7 +2785,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { ...@@ -2772,7 +2785,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx); SAPercentileInfo *pInfo = getOutputInfo(pCtx);
buildHistogramInfo(pInfo); buildHistogramInfo(pInfo);
assert(pInfo->pHisto->elems != NULL); assert(pInfo->pHisto->elems != NULL);
...@@ -2816,7 +2829,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2816,7 +2829,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
return; return;
} }
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getOutputInfo(pCtx);
buildHistogramInfo(pOutput); buildHistogramInfo(pOutput);
SHistogramInfo *pHisto = pOutput->pHisto; SHistogramInfo *pHisto = pOutput->pHisto;
...@@ -4710,17 +4723,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ...@@ -4710,17 +4723,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
// Sample function with reservoir sampling algorithm // Sample function with reservoir sampling algorithm
static SSampleFuncInfo* getSampleFuncOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage stable is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SSampleFuncInfo *) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes, char *inputTags) { static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes, char *inputTags) {
assignVal(pInfo->values + index*bytes, pData, bytes, type); assignVal(pInfo->values + index*bytes, pData, bytes, type);
*(pInfo->timeStamps + index) = ts; *(pInfo->timeStamps + index) = ts;
...@@ -4800,7 +4802,7 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes ...@@ -4800,7 +4802,7 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
srand(taosSafeRand()); srand(taosSafeRand());
SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); SSampleFuncInfo *pRes = getOutputInfo(pCtx);
pRes->totalPoints = 0; pRes->totalPoints = 0;
pRes->numSampled = 0; pRes->numSampled = 0;
pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo));
...@@ -4814,7 +4816,7 @@ static void sample_function(SQLFunctionCtx *pCtx) { ...@@ -4814,7 +4816,7 @@ static void sample_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); SSampleFuncInfo *pRes = getOutputInfo(pCtx);
if (pRes->values != ((char*)pRes + sizeof(SSampleFuncInfo))) { if (pRes->values != ((char*)pRes + sizeof(SSampleFuncInfo))) {
pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo));
...@@ -4852,7 +4854,7 @@ static void sample_func_merge(SQLFunctionCtx *pCtx) { ...@@ -4852,7 +4854,7 @@ static void sample_func_merge(SQLFunctionCtx *pCtx) {
pInput->timeStamps = (int64_t*)((char*)pInput->values + pInput->colBytes * pCtx->param[0].i64); pInput->timeStamps = (int64_t*)((char*)pInput->values + pInput->colBytes * pCtx->param[0].i64);
pInput->taglists = (char*)pInput->timeStamps + sizeof(int64_t)*pCtx->param[0].i64; pInput->taglists = (char*)pInput->timeStamps + sizeof(int64_t)*pCtx->param[0].i64;
SSampleFuncInfo *pOutput = getSampleFuncOutputInfo(pCtx); SSampleFuncInfo *pOutput = getOutputInfo(pCtx);
pOutput->totalPoints = pInput->totalPoints; pOutput->totalPoints = pInput->totalPoints;
pOutput->numSampled = pInput->numSampled; pOutput->numSampled = pInput->numSampled;
for (int32_t i = 0; i < pInput->numSampled; ++i) { for (int32_t i = 0; i < pInput->numSampled; ++i) {
...@@ -4886,20 +4888,12 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -4886,20 +4888,12 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
// elapsed function // elapsed function
static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) {
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SElapsedInfo *)pCtx->pOutput;
} else {
return GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
}
}
static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
SElapsedInfo *pInfo = getSElapsedInfo(pCtx); SElapsedInfo *pInfo = getOutputInfo(pCtx);
pInfo->min = MAX_TS_KEY; pInfo->min = MAX_TS_KEY;
pInfo->max = 0; pInfo->max = 0;
pInfo->hasResult = 0; pInfo->hasResult = 0;
...@@ -4912,7 +4906,7 @@ static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t col ...@@ -4912,7 +4906,7 @@ static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t col
} }
static void elapsedFunction(SQLFunctionCtx *pCtx) { static void elapsedFunction(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx); SElapsedInfo *pInfo = getOutputInfo(pCtx);
if (pCtx->preAggVals.isSet) { if (pCtx->preAggVals.isSet) {
if (pInfo->min == MAX_TS_KEY) { if (pInfo->min == MAX_TS_KEY) {
pInfo->min = pCtx->preAggVals.statis.min; pInfo->min = pCtx->preAggVals.statis.min;
...@@ -4979,7 +4973,7 @@ elapsedOver: ...@@ -4979,7 +4973,7 @@ elapsedOver:
} }
static void elapsedMerge(SQLFunctionCtx *pCtx) { static void elapsedMerge(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx); SElapsedInfo *pInfo = getOutputInfo(pCtx);
memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes); memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes);
GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult; GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult;
} }
...@@ -5002,25 +4996,12 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) { ...@@ -5002,25 +4996,12 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
// histogram function
static SHistogramFuncInfo* getHistogramFuncOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage stable is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SHistogramFuncInfo *) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
SHistogramFuncInfo *pRes = getHistogramFuncOutputInfo(pCtx); SHistogramFuncInfo *pRes = getOutputInfo(pCtx);
if (!pRes) { if (!pRes) {
return false; return false;
} }
...@@ -5044,7 +5025,7 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p ...@@ -5044,7 +5025,7 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p
static void histogram_function(SQLFunctionCtx *pCtx) { static void histogram_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); SHistogramFuncInfo* pRes = getOutputInfo(pCtx);
if (pRes->orderedBins != (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo))) { if (pRes->orderedBins != (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo))) {
pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo)); pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo));
...@@ -5092,7 +5073,7 @@ static void histogram_func_merge(SQLFunctionCtx *pCtx) { ...@@ -5092,7 +5073,7 @@ static void histogram_func_merge(SQLFunctionCtx *pCtx) {
SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx); SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx);
pInput->orderedBins = (SHistogramFuncBin*)((char*)pInput + sizeof(SHistogramFuncInfo)); pInput->orderedBins = (SHistogramFuncBin*)((char*)pInput + sizeof(SHistogramFuncInfo));
SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); SHistogramFuncInfo* pRes = getOutputInfo(pCtx);
for (int32_t i = 0; i < pInput->numOfBins; ++i) { for (int32_t i = 0; i < pInput->numOfBins; ++i) {
pRes->orderedBins[i].count += pInput->orderedBins[i].count; pRes->orderedBins[i].count += pInput->orderedBins[i].count;
} }
...@@ -5129,18 +5110,6 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5129,18 +5110,6 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
// unique use the intermediate result buffer to keep the intermediate result
static SUniqueFuncInfo *getUniqueOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SUniqueFuncInfo*) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
// unique // unique
static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -5174,7 +5143,7 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { ...@@ -5174,7 +5143,7 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1));
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
int16_t offset = sizeof(UniqueUnit) + bytes; int32_t offset = (int32_t)sizeof(UniqueUnit) + bytes;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
...@@ -5238,7 +5207,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK ...@@ -5238,7 +5207,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK
} }
static void unique_function(SQLFunctionCtx *pCtx) { static void unique_function(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); SUniqueFuncInfo *pInfo = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; i++) { for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i); char *pData = GET_INPUT_DATA(pCtx, i);
...@@ -5248,7 +5217,8 @@ static void unique_function(SQLFunctionCtx *pCtx) { ...@@ -5248,7 +5217,8 @@ static void unique_function(SQLFunctionCtx *pCtx) {
} }
do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes, pCtx->inputType); do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes, pCtx->inputType);
if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE
|| (pInfo->num > MAX_UNIQUE_RESULT_ROWS)){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return; return;
} }
...@@ -5259,7 +5229,7 @@ static void unique_function(SQLFunctionCtx *pCtx) { ...@@ -5259,7 +5229,7 @@ static void unique_function(SQLFunctionCtx *pCtx) {
static void unique_function_merge(SQLFunctionCtx *pCtx) { static void unique_function_merge(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); SUniqueFuncInfo *pOutput = getOutputInfo(pCtx);
size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen; size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
char *tmp = pInput->res + i* size; char *tmp = pInput->res + i* size;
...@@ -5268,13 +5238,14 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) { ...@@ -5268,13 +5238,14 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes; char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes;
do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes, pCtx->outputType); do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes, pCtx->outputType);
if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE
|| (pOutput->num > MAX_UNIQUE_RESULT_ROWS)){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return; return;
} }
} }
GET_RES_INFO(pCtx)->numOfRes = pOutput->num; // GET_RES_INFO(pCtx)->numOfRes = pOutput->num;
} }
typedef struct{ typedef struct{
...@@ -5284,11 +5255,11 @@ typedef struct{ ...@@ -5284,11 +5255,11 @@ typedef struct{
static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) { static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) {
UiqueSupporter *support = (UiqueSupporter *)param; UiqueSupporter *support = (UiqueSupporter *)param;
return support->comparFn(p1 + support->dataOffset, p2 + support->dataOffset); return support->comparFn((const char*)p1 + support->dataOffset, (const char*)p2 + support->dataOffset);
} }
static void unique_func_finalizer(SQLFunctionCtx *pCtx) { static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); SUniqueFuncInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
GET_RES_INFO(pCtx)->numOfRes = pInfo->num; GET_RES_INFO(pCtx)->numOfRes = pInfo->num;
int32_t bytes = 0; int32_t bytes = 0;
...@@ -5317,6 +5288,114 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5317,6 +5288,114 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
static bool mode_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
if(*pCtx->pModeSet != NULL){
taosHashClear(*pCtx->pModeSet);
}else{
*pCtx->pModeSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
return true;
}
static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *pData, int64_t count, int32_t bytes, int16_t type){
int32_t hashKeyBytes = bytes;
if(IS_VAR_DATA_TYPE(type)){ // for var data, we can not use bytes, because there are dirty data in the back of var data
hashKeyBytes = varDataTLen(pData);
}
ModeUnit **mode = taosHashGet(*pCtx->pModeSet, pData, hashKeyBytes);
if (mode == NULL) {
size_t size = sizeof(ModeUnit) + bytes;
char *tmp = pInfo->res + pInfo->num * size;
((ModeUnit*)tmp)->count = count;
char *data = tmp + sizeof(ModeUnit);
memcpy(data, pData, bytes);
taosHashPut(*pCtx->pModeSet, pData, hashKeyBytes, &tmp, sizeof(ModeUnit*));
pInfo->num++;
}else{
(*mode)->count += count;
}
}
static void mode_function(SQLFunctionCtx *pCtx) {
SModeFuncInfo *pInfo = getOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
continue;
}
do_mode_function(pCtx, pInfo, pData, 1, pCtx->inputBytes, pCtx->inputType);
if (sizeof(SModeFuncInfo) + pInfo->num * (sizeof(ModeUnit) + pCtx->inputBytes) >= MAX_MODE_INNER_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void mode_function_merge(SQLFunctionCtx *pCtx) {
SModeFuncInfo *pInput = (SModeFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
SModeFuncInfo *pOutput = getOutputInfo(pCtx);
size_t size = sizeof(ModeUnit) + pCtx->outputBytes;
for (int32_t i = 0; i < pInput->num; ++i) {
char *tmp = pInput->res + i* size;
char *data = tmp + sizeof(ModeUnit);
do_mode_function(pCtx, pOutput, data, ((ModeUnit*)tmp)->count, pCtx->outputBytes, pCtx->outputType);
if (sizeof(SModeFuncInfo) + pOutput->num * (sizeof(ModeUnit) + pCtx->outputBytes) >= MAX_MODE_INNER_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return;
}
}
}
static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
int32_t bytes = 0;
int32_t type = 0;
if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes;
type = pCtx->outputType;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else {
bytes = pCtx->inputBytes;
type = pCtx->inputType;
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SModeFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
size_t size = sizeof(ModeUnit) + bytes;
char *tvp = pRes->res;
char *result = NULL;
int64_t maxCount = 0;
for (int32_t i = 0; i < pRes->num; ++i) {
int64_t count = ((ModeUnit*)tvp)->count;
if (count > maxCount){
maxCount = count;
result = tvp;
}else if(count == maxCount){
result = NULL;
}
tvp += size;
}
if (result){
memcpy(pCtx->pOutput, result + sizeof(ModeUnit), bytes);
}else{
setNull(pCtx->pOutput, type, 0);
}
pResInfo->numOfRes = 1;
doFinalizer(pCtx);
}
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* function compatible list. * function compatible list.
...@@ -5337,8 +5416,8 @@ int32_t functionCompatList[] = { ...@@ -5337,8 +5416,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, // tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1, 6, 8, -1, -1, -1,
// block_info,elapsed,histogram,unique // block_info,elapsed,histogram,unique,mode
7, 1, -1, -1 7, 1, -1, -1, 1
}; };
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
...@@ -5823,5 +5902,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -5823,5 +5902,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
unique_func_finalizer, unique_func_finalizer,
unique_function_merge, unique_function_merge,
dataBlockRequired, dataBlockRequired,
},
{
// 40
"mode",
TSDB_FUNC_MODE,
TSDB_FUNC_MODE,
TSDB_BASE_FUNC_SO,
mode_function_setup,
mode_function,
mode_func_finalizer,
mode_function_merge,
dataBlockRequired,
} }
}; };
...@@ -362,7 +362,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -362,7 +362,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
qError("size is too large, failed to allocate column buffer for output buffer"); qError("size is too large, failed to allocate column buffer for output buffer");
tmp = 128*1024*1024; tmp = 128*1024*1024;
} }
int32_t size = MAX(tmp, minSize); int32_t size = (int32_t)MAX(tmp, minSize);
idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform
if (idata.pData == NULL) { if (idata.pData == NULL) {
qError("failed to allocate column buffer for output buffer"); qError("failed to allocate column buffer for output buffer");
...@@ -1009,11 +1009,9 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -1009,11 +1009,9 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
} }
} }
if (functionId == TSDB_FUNC_UNIQUE && if (GET_RES_INFO(&(pCtx[k]))->numOfRes == -1){
(GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ qError("result num is too large.");
qError("Unique result num is too large. num: %d, limit: %d", longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE);
GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
} }
// restore it // restore it
...@@ -1276,11 +1274,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction ...@@ -1276,11 +1274,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
assert(0); assert(0);
} }
if (functionId == TSDB_FUNC_UNIQUE && if (GET_RES_INFO(&(pCtx[k]))->numOfRes == -1){
(GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ qError("Mode inner result num is too large");
qError("Unique result num is too large. num: %d, limit: %d", longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE);
GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
} }
} }
} }
...@@ -1965,7 +1961,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -1965,7 +1961,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
} }
pCtx->inputType = pSqlExpr->colType; pCtx->inputType = pSqlExpr->colType;
if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && pSqlExpr->functionId == TSDB_FUNC_UNIQUE){ if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX &&
(pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE)){
pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal;
}else{ }else{
pCtx->inputBytes = pSqlExpr->colBytes; pCtx->inputBytes = pSqlExpr->colBytes;
...@@ -3690,6 +3687,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3690,6 +3687,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
pCtx[i].resultInfo = pCellInfo; pCtx[i].resultInfo = pCellInfo;
if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) {
pCtx[i].pUniqueSet = &pRow->uniqueHash; pCtx[i].pUniqueSet = &pRow->uniqueHash;
}else if (pCtx[i].functionId == TSDB_FUNC_MODE) {
pCtx[i].pModeSet = &pRow->modeHash;
} }
pCtx[i].pOutput = pData->pData; pCtx[i].pOutput = pData->pData;
pCtx[i].currentStage = stage; pCtx[i].currentStage = stage;
...@@ -4027,6 +4026,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -4027,6 +4026,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){ if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){
pCtx[i].pUniqueSet = &pResult->uniqueHash; pCtx[i].pUniqueSet = &pResult->uniqueHash;
}else if (pCtx[i].functionId == TSDB_FUNC_MODE){
pCtx[i].pModeSet = &pResult->modeHash;
} }
SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; SResultRowCellInfo* pResInfo = pCtx[i].resultInfo;
...@@ -4123,6 +4124,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF ...@@ -4123,6 +4124,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) {
pCtx[i].pUniqueSet = &pResult->uniqueHash; pCtx[i].pUniqueSet = &pResult->uniqueHash;
}else if (pCtx[i].functionId == TSDB_FUNC_MODE) {
pCtx[i].pModeSet = &pResult->modeHash;
} }
} }
} }
......
...@@ -92,6 +92,10 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -92,6 +92,10 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
taosHashCleanup(pResultRowInfo->pResult[i]->uniqueHash); taosHashCleanup(pResultRowInfo->pResult[i]->uniqueHash);
pResultRowInfo->pResult[i]->uniqueHash = NULL; pResultRowInfo->pResult[i]->uniqueHash = NULL;
} }
if (pResultRowInfo->pResult[i]->modeHash){
taosHashCleanup(pResultRowInfo->pResult[i]->modeHash);
pResultRowInfo->pResult[i]->modeHash = NULL;
}
} }
} }
...@@ -205,7 +209,7 @@ SResultRowPool* initResultRowPool(size_t size) { ...@@ -205,7 +209,7 @@ SResultRowPool* initResultRowPool(size_t size) {
qError("ResultRow blockSize is too large:%" PRId64, tmp); qError("ResultRow blockSize is too large:%" PRId64, tmp);
tmp = 128*1024*1024; tmp = 128*1024*1024;
} }
p->blockSize = tmp; p->blockSize = (int32_t)tmp;
p->position.pos = 0; p->position.pos = 0;
p->pData = taosArrayInit(8, POINTER_BYTES); p->pData = taosArrayInit(8, POINTER_BYTES);
......
...@@ -299,7 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha ...@@ -299,7 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE, "Unique result num is too large") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RESULT_TOO_LARGE, "result num is too large")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册