提交 7d499b69 编写于 作者: wmmhello's avatar wmmhello

fix errors in mode function

上级 2aff89c7
...@@ -231,15 +231,15 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) ...@@ -231,15 +231,15 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes)
int32_t tscGetResRowLength(SArray* pExprList); int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int32_t interSize, bool isTagCol);
SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType); int16_t size, int16_t resColId, int32_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes); void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int32_t interSize, bool isTagCol);
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int32_t size); int32_t size);
......
...@@ -2855,7 +2855,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2855,7 +2855,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){ if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
...@@ -4013,7 +4013,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -4013,7 +4013,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_SAMPLE) ||
(functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_ELAPSED) ||
(functionId == TSDB_FUNC_HISTOGRAM) || (functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE)) { (functionId == TSDB_FUNC_UNIQUE) ||
(functionId == TSDB_FUNC_MODE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
......
...@@ -2535,7 +2535,7 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArr ...@@ -2535,7 +2535,7 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArr
SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { int16_t size, int16_t resColId, int32_t interSize, int32_t colType) {
SExprInfo* pExpr = calloc(1, sizeof(SExprInfo)); SExprInfo* pExpr = calloc(1, sizeof(SExprInfo));
if (pExpr == NULL) { if (pExpr == NULL) {
return NULL; return NULL;
...@@ -2592,7 +2592,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo ...@@ -2592,7 +2592,7 @@ SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SCo
} }
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int32_t interSize, bool isTagCol) {
int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList); int32_t num = (int32_t)taosArrayGetSize(pQueryInfo->exprList);
if (index == num) { if (index == num) {
return tscExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); return tscExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
...@@ -2605,7 +2605,7 @@ SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t function ...@@ -2605,7 +2605,7 @@ SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t function
} }
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int32_t interSize, bool isTagCol) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SExprInfo* pExpr = tscExprCreate(pTableMetaInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); SExprInfo* pExpr = tscExprCreate(pTableMetaInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprList, &pExpr); taosArrayPush(pQueryInfo->exprList, &pExpr);
...@@ -4937,7 +4937,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -4937,7 +4937,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
pse->colInfo.colIndex = i; pse->colInfo.colIndex = i;
pse->colType = pExpr->base.resType; pse->colType = pExpr->base.resType;
if(pExpr->base.resBytes > INT16_MAX && pExpr->base.functionId == TSDB_FUNC_UNIQUE){ if(pExpr->base.resBytes > INT16_MAX &&
(pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE)){
pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; pQueryAttr->interBytesForGlobal = pExpr->base.resBytes;
}else{ }else{
pse->colBytes = pExpr->base.resBytes; pse->colBytes = pExpr->base.resBytes;
......
...@@ -5301,7 +5301,7 @@ static bool mode_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn ...@@ -5301,7 +5301,7 @@ static bool mode_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
return true; return true;
} }
static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *pData, int32_t bytes, int16_t type){ static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *pData, int64_t count, int32_t bytes, int16_t type){
int32_t hashKeyBytes = bytes; int32_t hashKeyBytes = bytes;
if(IS_VAR_DATA_TYPE(type)){ // for var data, we can not use bytes, because there are dirty data in the back of var data if(IS_VAR_DATA_TYPE(type)){ // for var data, we can not use bytes, because there are dirty data in the back of var data
hashKeyBytes = varDataTLen(pData); hashKeyBytes = varDataTLen(pData);
...@@ -5310,14 +5310,14 @@ static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *p ...@@ -5310,14 +5310,14 @@ static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *p
if (mode == NULL) { if (mode == NULL) {
size_t size = sizeof(ModeUnit) + bytes; size_t size = sizeof(ModeUnit) + bytes;
char *tmp = pInfo->res + pInfo->num * size; char *tmp = pInfo->res + pInfo->num * size;
((ModeUnit*)tmp)->count = 1; ((ModeUnit*)tmp)->count = count;
char *data = tmp + sizeof(ModeUnit); char *data = tmp + sizeof(ModeUnit);
memcpy(data, pData, bytes); memcpy(data, pData, bytes);
taosHashPut(*pCtx->pModeSet, pData, hashKeyBytes, &tmp, sizeof(ModeUnit*)); taosHashPut(*pCtx->pModeSet, pData, hashKeyBytes, &tmp, sizeof(ModeUnit*));
pInfo->num++; pInfo->num++;
}else{ }else{
(*mode)->count++; (*mode)->count += count;
} }
} }
...@@ -5326,8 +5326,11 @@ static void mode_function(SQLFunctionCtx *pCtx) { ...@@ -5326,8 +5326,11 @@ static void mode_function(SQLFunctionCtx *pCtx) {
for (int32_t i = 0; i < pCtx->size; i++) { for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i); char *pData = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
continue;
}
do_mode_function(pCtx, pInfo, pData, pCtx->inputBytes, pCtx->inputType); do_mode_function(pCtx, pInfo, pData, 1, pCtx->inputBytes, pCtx->inputType);
if (sizeof(SModeFuncInfo) + pInfo->num * (sizeof(ModeUnit) + pCtx->inputBytes) >= MAX_MODE_INNER_RESULT_SIZE){ if (sizeof(SModeFuncInfo) + pInfo->num * (sizeof(ModeUnit) + pCtx->inputBytes) >= MAX_MODE_INNER_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
...@@ -5344,7 +5347,7 @@ static void mode_function_merge(SQLFunctionCtx *pCtx) { ...@@ -5344,7 +5347,7 @@ static void mode_function_merge(SQLFunctionCtx *pCtx) {
for (int32_t i = 0; i < pInput->num; ++i) { for (int32_t i = 0; i < pInput->num; ++i) {
char *tmp = pInput->res + i* size; char *tmp = pInput->res + i* size;
char *data = tmp + sizeof(ModeUnit); char *data = tmp + sizeof(ModeUnit);
do_mode_function(pCtx, pOutput, data, pCtx->outputBytes, pCtx->outputType); do_mode_function(pCtx, pOutput, data, ((ModeUnit*)tmp)->count, pCtx->outputBytes, pCtx->outputType);
if (sizeof(SModeFuncInfo) + pOutput->num * (sizeof(ModeUnit) + pCtx->outputBytes) >= MAX_MODE_INNER_RESULT_SIZE){ if (sizeof(SModeFuncInfo) + pOutput->num * (sizeof(ModeUnit) + pCtx->outputBytes) >= MAX_MODE_INNER_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
...@@ -5355,11 +5358,14 @@ static void mode_function_merge(SQLFunctionCtx *pCtx) { ...@@ -5355,11 +5358,14 @@ static void mode_function_merge(SQLFunctionCtx *pCtx) {
static void mode_func_finalizer(SQLFunctionCtx *pCtx) { static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
int32_t bytes = 0; int32_t bytes = 0;
int32_t type = 0;
if (pCtx->currentStage == MERGE_STAGE) { if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes; bytes = pCtx->outputBytes;
type = pCtx->outputType;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else { } else {
bytes = pCtx->inputBytes; bytes = pCtx->inputBytes;
type = pCtx->inputType;
} }
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -5383,11 +5389,10 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5383,11 +5389,10 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
if (result){ if (result){
memcpy(pCtx->pOutput, result + sizeof(ModeUnit), bytes); memcpy(pCtx->pOutput, result + sizeof(ModeUnit), bytes);
pResInfo->numOfRes = 1;
}else{ }else{
pResInfo->numOfRes = 0; setNull(pCtx->pOutput, type, 0);
} }
pResInfo->numOfRes = 1;
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -5903,7 +5908,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -5903,7 +5908,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
"mode", "mode",
TSDB_FUNC_MODE, TSDB_FUNC_MODE,
TSDB_FUNC_MODE, TSDB_FUNC_MODE,
TSDB_FUNCSTATE_SO, TSDB_BASE_FUNC_SO,
mode_function_setup, mode_function_setup,
mode_function, mode_function,
mode_func_finalizer, mode_func_finalizer,
......
...@@ -1961,7 +1961,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -1961,7 +1961,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
} }
pCtx->inputType = pSqlExpr->colType; pCtx->inputType = pSqlExpr->colType;
if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && pSqlExpr->functionId == TSDB_FUNC_UNIQUE){ if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX &&
(pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE)){
pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal;
}else{ }else{
pCtx->inputBytes = pSqlExpr->colBytes; pCtx->inputBytes = pSqlExpr->colBytes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册