提交 25aa2933 编写于 作者: wmmhello's avatar wmmhello

modify unique function like top

上级 dfad8aca
......@@ -593,7 +593,7 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
}
}
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
}
......@@ -605,12 +605,20 @@ static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, i
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(((SMultiwayMergeInfo*)(pInfo->info))->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
} else {
assert(!TSDB_FUNC_IS_SCALAR(functionId));
aAggs[functionId].mergeFunc(&pCtx[j]);
}
SQueryAttr* pQueryAttr = pInfo->pRuntimeEnv->pQueryAttr;
if (functionId == TSDB_FUNC_UNIQUE &&
(GET_RES_INFO(&(pCtx[j]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[j]))->numOfRes == -1)){
tscError("Unique result num is too large. num: %d, limit: %d",
GET_RES_INFO(&(pCtx[j]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
}
}
}
......@@ -644,7 +652,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
} else {
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
......@@ -671,10 +679,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
}
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
}
} else {
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
}
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
......
......@@ -3106,9 +3106,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList);
if (cnt != 2 && cnt != 3) valid = false;
} else if (functionId == TSDB_FUNC_UNIQUE) {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1)
valid = false;
else {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) valid = false;
}else {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false;
}
if (!valid) {
......@@ -3245,20 +3244,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
} else if (functionId == TSDB_FUNC_UNIQUE) {
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false);
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS].name, pExpr);
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
resultSize, false);
} else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......@@ -3267,6 +3252,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12);
}
if(functionId == TSDB_FUNC_UNIQUE){
GET_INT32_VAL(val) = MAX_UNIQUE_RESULT_ROWS;
}
// todo REFACTOR
// set the first column ts for top/bottom query
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
......@@ -3445,10 +3433,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
#define USER_INPUT_BIN 0
#define LINEAR_BIN 1
#define LOG_BIN 2
#define USER_INPUT_BIN 0
#define LINEAR_BIN 1
#define LOG_BIN 2
int8_t binType;
if (strcasecmp(pVariant->pz, "user_input") == 0) {
binType = USER_INPUT_BIN;
......@@ -3700,7 +3687,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
// todo refactor
static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex) {
......
......@@ -202,7 +202,7 @@ typedef struct SQLFunctionCtx {
SPoint1 start;
SPoint1 end;
int32_t maxUniqueResult;
SHashObj *pUniqueSet; // for unique function
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......
......@@ -78,7 +78,8 @@ typedef struct SDiskbasedResultBuf {
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define MAX_UNIQUE_RESULT_SIZE (1000000)
#define MAX_UNIQUE_RESULT_ROWS (10000)
#define MAX_UNIQUE_RESULT_SIZE (1024*1024*20)
/**
* create disk-based result buffer
* @param pResultBuf
......
......@@ -225,19 +225,14 @@ typedef struct{
typedef struct {
int64_t timestamp;
char * pTags;
char data[];
} UniqueUnit;
typedef struct {
SHashObj *pSet;
int32_t num;
char res[];
} SUniqueFuncInfo;
void freeUniqueUnit(void* unit){
tfree(((UniqueUnit *)unit)->pTags);
}
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -371,7 +366,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_UNIQUE) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
int64_t size = sizeof(UniqueUnit) + dataBytes + extLength;
size *= param;
size += sizeof(SUniqueFuncInfo);
if (size > MAX_UNIQUE_RESULT_SIZE){
size = MAX_UNIQUE_RESULT_SIZE;
}
*bytes = size;
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -498,17 +499,19 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_UNIQUE) {
*type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
int64_t size = sizeof(UniqueUnit) + dataBytes + extLength;
size *= param;
size += sizeof(SUniqueFuncInfo);
if (size > MAX_UNIQUE_RESULT_SIZE){
size = MAX_UNIQUE_RESULT_SIZE;
}
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
......@@ -5143,87 +5146,19 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t type) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen;
char *tvp = pRes->res;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
switch (type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
int32_t *output = (int32_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:{
int64_t *output = (int64_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *output = (double *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.dKey;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *output = (float *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.dKey;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *output = (int16_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:{
int8_t *output = (int8_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
char *tsOutput = pCtx->ptsOutputBuf;
char *output = pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += pCtx->outputBytes) {
*output = ((tValuePair *)tvp)->v.i64;
memcpy(output, ((tValuePair *)tvp)->v.pz, ((tValuePair *)tvp)->v.nLen);
tvp += size;
}
break;
}
default: {
qError("unique function not support data type:%d", pCtx->inputType);
return;
}
}
// set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < len; ++i, output ++) {
*output = ((tValuePair *)tvp)->timestamp;
for (int32_t i = 0; i < len; ++i) {
memcpy(tsOutput, tvp, sizeof(int64_t));
memcpy(output, tvp + sizeof(UniqueUnit), pCtx->inputBytes);
tvp += size;
tsOutput += sizeof(int64_t);
output += pCtx->inputBytes;
}
// set the corresponding tag data for each record
......@@ -5237,14 +5172,15 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t type) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
}
for (int32_t i = 0; i < len; ++i, output ++) {
int16_t offset = 0;
tvp = pRes->res;
for (int32_t i = 0; i < len; ++i) {
int16_t offset = sizeof(UniqueUnit) + pCtx->inputBytes;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], ((tValuePair *)tvp)->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
tvp += size;
}
tvp += size;
}
tfree(pData);
......@@ -5254,69 +5190,79 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
if (!function_setup(pCtx, pResInfo)) {
return false;
}
SUniqueFuncInfo *uniqueInfo = getUniqueOutputInfo(pCtx);
uniqueInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashSetFreeFp(uniqueInfo->pSet, freeUniqueUnit);
return true;
}
static void unique_function(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i);
TSKEY k = 0;
if (pCtx->ptsList != NULL) {
k = GET_TS_DATA(pCtx, i);
}
tValuePair *unique = taosHashGet(pInfo->pSet, pData, pCtx->inputBytes);
static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag){
tValuePair *unique = taosHashGet(pCtx->pUniqueSet, pData, pCtx->inputBytes);
if (unique == NULL) {
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
tValuePair *tmp = (tValuePair *)(pInfo->res + pInfo->num * size);
if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&tmp->v, varDataVal(pData), varDataLen(pData), pCtx->inputType);
size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen;
char *tmp = pInfo->res + pInfo->num * size;
((UniqueUnit*)tmp)->timestamp = timestamp;
char *data = tmp + sizeof(UniqueUnit);
char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes;
memcpy(data, pData, pCtx->inputBytes);
if (pCtx->currentStage == MERGE_STAGE && tag != NULL) {
memcpy(tags, tag, (size_t)pCtx->tagInfo.tagsLen);
}else{
tVariantCreateFromBinary(&tmp->v, pData, 0, pCtx->inputType);
}
tmp->timestamp = k;
int32_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j];
if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
aAggs[TSDB_FUNC_TAG].xFunction(tagCtx);
memcpy(tmp->pTags + offset, tagCtx->pOutput, tagCtx->outputBytes);
memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes);
offset += tagCtx->outputBytes;
}
}
}
taosHashPut(pInfo->pSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*));
taosHashPut(pCtx->pUniqueSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*));
pInfo->num++;
}else if(unique->timestamp > k){
unique->timestamp = k;
}else if(unique->timestamp > timestamp){
unique->timestamp = timestamp;
}
}
static void unique_function(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i);
TSKEY k = 0;
if (pCtx->ptsList != NULL) {
k = GET_TS_DATA(pCtx, i);
}
do_unique_function(pCtx, pInfo, k, pData, NULL);
if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void unique_function_merge(SQLFunctionCtx *pCtx) {
//SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
//SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx);
// the intermediate result is binary, we only use the output data type
// for (int32_t i = 0; i < pInput->num; ++i) {
// int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType;
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp,
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// }
//
// SET_VAL(pCtx, pInput->num, pOutput->num);
//
// if (pOutput->num > 0) {
// SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// pResInfo->hasResult = DATA_SET_FLAG;
// }
SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx);
size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pInput->num; ++i) {
char *tmp = pInput->res + i* size;
TSKEY timestamp = ((UniqueUnit*)tmp)->timestamp;
char *data = tmp + sizeof(UniqueUnit);
char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes;
do_unique_function(pCtx, pOutput, timestamp, data, tags);
if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){
GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory
return;
}
}
GET_RES_INFO(pCtx)->numOfRes = pOutput->num;
}
static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
......@@ -5328,7 +5274,6 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......
......@@ -356,7 +356,13 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
idata.info.bytes = pExpr[i].base.resBytes;
idata.info.colId = pExpr[i].base.resColId;
int32_t size = MAX(idata.info.bytes * numOfRows, minSize);
int64_t tmp = idata.info.bytes;
tmp *= numOfRows;
if (tmp >= 1024*1024*1024) { // 1G
qError("size is too large, failed to allocate column buffer for output buffer");
goto _clean;
}
int32_t size = MAX(tmp, minSize);
idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform
if (idata.pData == NULL) {
qError("failed to allocate column buffer for output buffer");
......@@ -1003,10 +1009,10 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
}
if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){
if (functionId == TSDB_FUNC_UNIQUE &&
(GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){
qError("Unique result num is too large. num: %d, limit: %d",
GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult);
aAggs[functionId].xFinalize(&pCtx[k]);
GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
}
......@@ -1271,10 +1277,10 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){
if (functionId == TSDB_FUNC_UNIQUE &&
(GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){
qError("Unique result num is too large. num: %d, limit: %d",
GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult);
aAggs[functionId].xFinalize(&pCtx[k]);
GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
}
}
......@@ -1976,9 +1982,6 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
if (pCtx->functionId == TSDB_FUNC_UNIQUE){
pCtx->maxUniqueResult = pQueryAttr->maxUniqueResult;
}
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType;
......@@ -2028,6 +2031,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == TSDB_FUNC_SCALAR_EXPR) {
pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i];
} else if (functionId == TSDB_FUNC_UNIQUE){
pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
}
......@@ -2052,6 +2057,9 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
tVariantDestroy(&pCtx[i].tag);
tfree(pCtx[i].tagInfo.pTagCtxList);
if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){
taosHashClear(pCtx[i].pUniqueSet);
}
}
tfree(pCtx);
......@@ -2771,15 +2779,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t MIN_ROWS_PER_PAGE = 4;
if (pQueryAttr->uniqueQuery) {
int64_t rowSize = pQueryAttr->resultRowSize;
while(rowSize*pQueryAttr->maxUniqueResult > 1024*1024*100){
pQueryAttr->maxUniqueResult = pQueryAttr->maxUniqueResult >> 1u;
}
*rowsize = (int32_t)(rowSize*pQueryAttr->maxUniqueResult);
}else{
*rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
}
int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows
......@@ -8994,7 +8994,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE) {
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || funcIf == TSDB_FUNC_UNIQUE) {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pTableInfo->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG;
......@@ -9580,7 +9580,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->pFilters = pFilters;
pQueryAttr->range = pQueryMsg->range;
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs);
pQueryAttr->maxUniqueResult = MAX_UNIQUE_RESULT_SIZE;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) {
......
......@@ -45,7 +45,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
}
if (pQueryAttr->uniqueQuery){
return pQueryAttr->maxUniqueResult;
return MAX_UNIQUE_RESULT_ROWS;
}
return 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册