提交 00bd8437 编写于 作者: wmmhello's avatar wmmhello

add unique logic

上级 cf0679a4
......@@ -456,7 +456,7 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
while((pg - overhead) < pModel->rowSize * 2) {
while((pg - overhead) < rlen * 2) {
pg *= 2;
}
......@@ -656,7 +656,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM ||
pCtx[j].functionId == TSDB_FUNC_SAMPLE) {
pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE) {
if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput;
}
}
......
......@@ -1065,13 +1065,14 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
* if the top/bottom exists, only tags columns, tbname column, and primary timestamp column
* are available.
*/
static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
static bool isTopBottomUniqueQuery(SQueryInfo* pQueryInfo) {
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM
|| functionId == TSDB_FUNC_UNIQUE) {
return true;
}
}
......@@ -1114,7 +1115,7 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn
const char* msg2 = "top/bottom query does not support order by value in time window query";
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomQuery(pQueryInfo)) {
if (isTopBottomUniqueQuery(pQueryInfo)) {
// invalid sql:
// top(col, k) from table_name [interval(1d)|session(ts, 1d)] order by k asc
......@@ -2689,7 +2690,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg27 = "factor param cannot be negative or equal to 0/1";
const char* msg28 = "the second paramter of diff should be 0 or 1";
switch (functionId) {
case TSDB_FUNC_COUNT: {
/* more than one parameter for count() function */
......@@ -2719,7 +2719,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size,
false);
} else {
// count the number of table created according to the super table
if (getColumnIndexByName(pToken, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
......@@ -2730,29 +2731,33 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// count tag is equalled to count(tbname)
bool isTag = false;
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
index.columnIndex = TSDB_TBNAME_COLUMN_INDEX;
isTag = true;
}
int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, isTag);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size,
isTag);
}
} else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size,
false);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1);
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1);
SColumnList list = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
int32_t numOfOutput = tscNumOfFields(pQueryInfo);
insertResultField(pQueryInfo, numOfOutput, &list, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->base.aliasName, pExpr);
insertResultField(pQueryInfo, numOfOutput, &list, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->base.aliasName,
pExpr);
} else {
for (int32_t i = 0; i < list.num; ++i) {
SSchema* ps = tscGetTableSchema(pTableMetaInfo->pTableMeta);
......@@ -2783,12 +2788,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_LEASTSQR:
case TSDB_FUNC_ELAPSED: {
// 1. valid the number of parameters
int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->Expr.paramList);
int32_t numOfParams =
(pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList);
// no parameters or more than one parameter for function
if (pItem->pNode->Expr.paramList == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && functionId != TSDB_FUNC_DIFF
&& numOfParams != 1) ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED &&
functionId != TSDB_FUNC_DIFF && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) ||
(functionId == TSDB_FUNC_ELAPSED && numOfParams != 1 && numOfParams != 2) ||
(functionId == TSDB_FUNC_DIFF && numOfParams != 1 && numOfParams != 2)) {
......@@ -2796,12 +2802,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || 0 == pParamElem->pNode->columnName.n) {
if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) ||
0 == pParamElem->pNode->columnName.n) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if ((getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS)) {
if ((getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -2810,13 +2818,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// elapsed only can be applied to primary key
if (functionId == TSDB_FUNC_ELAPSED) {
if ( index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX || pColumnSchema->colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX ||
pColumnSchema->colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "elapsed only can be applied to primary key");
}
}
//for timeline related aggregation function like elapsed and twa, groupby in subquery is not allowed
//as calculation result is meaningless by mixing different childtables(timelines) results.
// for timeline related aggregation function like elapsed and twa, groupby in subquery is not allowed
// as calculation result is meaningless by mixing different childtables(timelines) results.
if ((functionId == TSDB_FUNC_ELAPSED || functionId == TSDB_FUNC_TWA) && pQueryInfo->pUpstream != NULL) {
size_t numOfUpstreams = taosArrayGetSize(pQueryInfo->pUpstream);
for (int32_t i = 0; i < numOfUpstreams; ++i) {
......@@ -2830,7 +2839,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
// functions can not be applied to tags
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) {
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX ||
(index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
......@@ -2839,7 +2849,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) &&
(functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......@@ -2860,10 +2871,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS_DUMMY].name, sizeof(pExpr->base.aliasName));
SColumnList ids = createColumnList(1, 0, 0);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
}
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
intermediateResSize, false);
if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters
char val[8] = {0};
......@@ -2886,14 +2899,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
char val[8] = {0};
int64_t tickPerSec = 0;
char *exprToken = tcalloc(pParamElem[1].pNode->exprToken.n + 1, sizeof(char));
char* exprToken = tcalloc(pParamElem[1].pNode->exprToken.n + 1, sizeof(char));
memcpy(exprToken, pParamElem[1].pNode->exprToken.z, pParamElem[1].pNode->exprToken.n);
if (pParamElem[1].pNode->exprToken.type == TK_NOW || strstr(exprToken, "now")) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tfree(exprToken);
if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) ||
tVariantDump(&pParamElem[1].pNode->value, (char*)&tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -2909,7 +2923,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg16);
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
tscExprAddParams(&pExpr->base, (char*)&tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
if (functionId == TSDB_FUNC_DERIVATIVE) {
memset(val, 0, tListLen(val));
......@@ -2917,7 +2931,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int64_t v = *(int64_t*) val;
int64_t v = *(int64_t*)val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
......@@ -2944,7 +2958,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1);
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1);
if (finalResult) {
int32_t numOfOutput = tscNumOfFields(pQueryInfo);
......@@ -2968,9 +2982,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// NOTE: has time range condition or normal column filter condition, the last_row query will be transferred to last query
SConvertFunc cvtFunc = {.originFuncId = functionId, .execFuncId = functionId};
if (functionId == TSDB_FUNC_LAST_ROW && ((!TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER)) ||
(hasNormalColumnFilter(pQueryInfo)) ||
taosArrayGetSize(pQueryInfo->pUpstream)>0)) {
if (functionId == TSDB_FUNC_LAST_ROW &&
((!TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER)) || (hasNormalColumnFilter(pQueryInfo)) ||
taosArrayGetSize(pQueryInfo->pUpstream) > 0)) {
cvtFunc.execFuncId = TSDB_FUNC_LAST;
}
......@@ -2979,7 +2993,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (taosArrayGetSize(pItem->pNode->Expr.paramList) > 1 && (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) > 1 &&
(pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
......@@ -3008,14 +3023,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SStrToken t = {.z = pSchema[j].name, .n = (uint32_t)strnlen(pSchema[j].name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t, true);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index,
finalResult, pUdfInfo) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult,
pUdfInfo) != 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
} else {
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -3036,7 +3052,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
bool multiColOutput = taosArrayGetSize(pItem->pNode->Expr.paramList) > 1;
setResultColName(name, pItem, cvtFunc.originFuncId, &pParamElem->pNode->columnName, multiColOutput);
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex++, &index, finalResult, pUdfInfo) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex++, &index, finalResult,
pUdfInfo) != 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
......@@ -3079,18 +3096,22 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_MAVG:
case TSDB_FUNC_SAMPLE:
case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: {
case TSDB_FUNC_APERCT:
case TSDB_FUNC_UNIQUE: {
// 1. valid the number of parameters
bool valid = true;
if(pItem->pNode->Expr.paramList == NULL) {
if (pItem->pNode->Expr.paramList == NULL) {
valid = false;
} else if(functionId == TSDB_FUNC_APERCT) {
} else if (functionId == TSDB_FUNC_APERCT) {
size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList);
if(cnt != 2 && cnt !=3) valid = false;
} else {
if (cnt != 2 && cnt != 3) valid = false;
} else if (functionId == TSDB_FUNC_UNIQUE) {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1)
valid = false;
else {
if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false;
}
if(!valid) {
if (!valid) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -3100,7 +3121,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -3121,12 +3143,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
tVariant* pVariant = NULL;
if (functionId != TSDB_FUNC_UNIQUE) {
// 3. valid the parameters
if (pParamElem[1].pNode->tokenId == TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tVariant* pVariant = &pParamElem[1].pNode->value;
pVariant = &pParamElem[1].pNode->value;
}
int16_t resultType = pSchema->type;
int32_t resultSize = pSchema->bytes;
......@@ -3137,7 +3162,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SExprInfo* pExpr = NULL;
if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) {
// param1 double
if(pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT){
if (pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
......@@ -3147,8 +3172,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false,
pUdfInfo);
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0,
false, pUdfInfo);
/*
* sql function transformation
......@@ -3158,7 +3183,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
// param2 int32
......@@ -3166,15 +3192,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (pParamElem[2].pNode != NULL) {
pVariant = &pParamElem[2].pNode->value;
// check type must string
if(pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL){
if (pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
char* pzAlgo = pVariant->pz;
int32_t algo = 0;
if(strcasecmp(pzAlgo, "t-digest") == 0) {
if (strcasecmp(pzAlgo, "t-digest") == 0) {
algo = 1;
} else if(strcasecmp(pzAlgo, "default") == 0){
} else if (strcasecmp(pzAlgo, "default") == 0) {
algo = 0;
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg14);
......@@ -3190,7 +3216,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
char* endptr = NULL;
strtoll(pParamElem[1].pNode->exprToken.z, &endptr, 10);
if ((endptr-pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) {
if ((endptr - pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg18);
}
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......@@ -3204,21 +3230,35 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// set the first column ts for top/bottom query
int32_t tsFuncId = (functionId == TSDB_FUNC_MAVG) ? TSDB_FUNC_TS_DUMMY : TSDB_FUNC_TS;
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0,
0, false);
pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false);
tstrncpy(pExpr->base.aliasName, aAggs[tsFuncId].name, sizeof(pExpr->base.aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[tsFuncId].name, pExpr);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[tsFuncId].name,
pExpr);
colIndex += 1; // the first column is ts
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType, &resultSize, &interResult, 0, false,
pUdfInfo);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType,
&resultSize, &interResult, 0, false, pUdfInfo);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
} else if (functionId == TSDB_FUNC_UNIQUE) {
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false);
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS].name, pExpr);
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
resultSize, false);
} else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......@@ -3230,8 +3270,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// todo REFACTOR
// set the first column ts for top/bottom query
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0,
0, false);
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false);
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
......@@ -3241,12 +3280,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
resultSize, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1);
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1);
// todo refactor: tscColumnListInsert part
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
......@@ -3276,7 +3316,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tSqlExpr* pParam = pParamItem->pNode;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -3338,7 +3379,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = 0,};
SColumnIndex index = {
.tableIndex = 0,
.columnIndex = 0,
};
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
int32_t inter = 0;
......@@ -3375,7 +3419,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -3390,12 +3435,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
//bin_type param
// bin_type param
if (pParamElem[1].pNode->tokenId == TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tVariant *pVariant = &pParamElem[1].pNode->value;
tVariant* pVariant = &pParamElem[1].pNode->value;
if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BINARY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -3414,7 +3459,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg20);
}
//bin_description param in JSON format
// bin_description param in JSON format
if (pParamElem[2].pNode->tokenId == TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -3424,11 +3469,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
cJSON *binDesc = cJSON_Parse(pVariant->pz);
cJSON* binDesc = cJSON_Parse(pVariant->pz);
int32_t counter;
int32_t numBins;
int32_t numOutput;
double *intervals;
double* intervals;
if (cJSON_IsObject(binDesc)) { /* linaer/log bins */
int32_t numOfParams = cJSON_GetArraySize(binDesc);
int32_t startIndex;
......@@ -3436,11 +3481,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22);
}
cJSON *start = cJSON_GetObjectItem(binDesc, "start");
cJSON *factor = cJSON_GetObjectItem(binDesc, "factor");
cJSON *width = cJSON_GetObjectItem(binDesc, "width");
cJSON *count = cJSON_GetObjectItem(binDesc, "count");
cJSON *infinity = cJSON_GetObjectItem(binDesc, "infinity");
cJSON* start = cJSON_GetObjectItem(binDesc, "start");
cJSON* factor = cJSON_GetObjectItem(binDesc, "factor");
cJSON* width = cJSON_GetObjectItem(binDesc, "width");
cJSON* count = cJSON_GetObjectItem(binDesc, "count");
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22);
......@@ -3450,10 +3495,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg25);
}
if (isinf(start->valuedouble) ||
(width != NULL && isinf(width->valuedouble)) ||
(factor != NULL && isinf(factor->valuedouble)) ||
(count != NULL && isinf(count->valuedouble))) {
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23);
}
......@@ -3468,7 +3511,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
intervals = tcalloc(numBins, sizeof(double));
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
//linear bin process
// linear bin process
if (width->valuedouble == 0) {
tfree(intervals);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24);
......@@ -3482,7 +3525,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
startIndex++;
}
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
//log bin process
// log bin process
if (start->valuedouble == 0) {
tfree(intervals);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg26);
......@@ -3511,7 +3554,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tfree(intervals);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23);
}
//in case of desc bin orders, -inf/inf should be swapped
// in case of desc bin orders, -inf/inf should be swapped
assert(numBins >= 4);
if (intervals[1] > intervals[numBins - 2]) {
SWAP(intervals[0], intervals[numBins - 1], double);
......@@ -3524,7 +3567,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
counter = numBins = cJSON_GetArraySize(binDesc);
intervals = tcalloc(numBins, sizeof(double));
cJSON *bin = binDesc->child;
cJSON* bin = binDesc->child;
if (bin == NULL) {
tfree(intervals);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22);
......@@ -3550,16 +3593,17 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int16_t resultType = pSchema->type;
int32_t resultSize = pSchema->bytes;
int32_t interResult = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0, false,
pUdfInfo);
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0,
false, pUdfInfo);
SExprInfo* pExpr = NULL;
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult,
false);
numOutput = numBins - 1;
tscExprAddParams(&pExpr->base, (char*)&numOutput, TSDB_DATA_TYPE_INT, sizeof(int32_t));
tscExprAddParams(&pExpr->base, (char*)intervals, TSDB_DATA_TYPE_BINARY, sizeof(double) * numBins);
tfree(intervals);
//normalized param
// normalized param
char val[8] = {0};
if (pParamElem[3].pNode->tokenId == TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
......@@ -3604,13 +3648,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);;
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
;
if (pParamElem->pNode->tokenId != TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) !=
TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -3630,7 +3676,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int32_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false);
SExprInfo* pExpr =
tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false);
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1);
......@@ -3640,7 +3687,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, pExpr);
insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName,
pExpr);
} else {
for (int32_t i = 0; i < ids.num; ++i) {
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, pSchema);
......@@ -3652,8 +3700,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
// todo refactor
static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex) {
assert(num == 1 && tableIndex >= 0);
......@@ -6614,7 +6662,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
if (isTopBottomQuery(pQueryInfo)) {
if (isTopBottomUniqueQuery(pQueryInfo)) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else { // in case of select tbname from super_table, the default order column can not be the primary ts column
pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro
......@@ -6770,7 +6818,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
}
}
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) {
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomUniqueQuery(pQueryInfo)) {
return invalidOperationMsg(pMsgBuf, msg3);
}
......@@ -6780,7 +6828,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (tscIsDiffDerivLikeQuery(pQueryInfo)) {
return invalidOperationMsg(pMsgBuf, msg12);
}
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
//pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
......@@ -6792,7 +6840,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} else if (isTopBottomQuery(pQueryInfo)) {
} else if (isTopBottomUniqueQuery(pQueryInfo)) {
/* order of top/bottom query in interval is not valid */
int32_t pos = tscExprTopBottomIndex(pQueryInfo);
......@@ -6839,7 +6887,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} else {
pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
//pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
} else if (orderByGroupbyCol){
pQueryInfo->order.order = pItem->sortOrder;
......@@ -6884,7 +6932,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) {
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomUniqueQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
......@@ -6905,11 +6953,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
}
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
//pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
}
if (isTopBottomQuery(pQueryInfo)) {
if (isTopBottomUniqueQuery(pQueryInfo)) {
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
......@@ -7983,7 +8031,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
pExpr->base.functionId = TSDB_FUNC_TAG_DUMMY;
tagLength += pExpr->base.resBytes;
} else if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pExpr->base.functionId = TSDB_FUNC_TS_DUMMY;
pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; // ts_select ts,top(col,2)
tagLength += pExpr->base.resBytes;
}
}
......@@ -8421,7 +8469,8 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF &&
f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE &&
f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ) {
f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ &&
f != TSDB_FUNC_UNIQUE) {
return invalidOperationMsg(msg, msg1);
}
......
......@@ -1045,8 +1045,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
//pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->groupOrderType = htons(pGroupbyExpr->orderType);
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
......
......@@ -74,11 +74,11 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%d", *(uint32_t*)buf);
n = sprintf(str, "%u", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRId64, *(uint64_t*)buf);
n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
......@@ -304,7 +304,7 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
return false;
}
// order by columnIndex exists, not a non-ordered projection query
// order by columnIndex not exists, not a ordered projection query
return pQueryInfo->order.orderColId < 0;
}
......@@ -313,7 +313,7 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
return false;
}
// order by columnIndex exists, a non-ordered projection query
// order by columnIndex exists, a ordered projection query
return pQueryInfo->order.orderColId >= 0;
}
......@@ -689,7 +689,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_TS_COMP ||
functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_HISTOGRAM)) {
functionId == TSDB_FUNC_HISTOGRAM ||
functionId == TSDB_FUNC_UNIQUE)) {
return true;
}
}
......@@ -5403,7 +5404,7 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in
// set json real data
cJSON *root = cJSON_Parse(json);
if (root == NULL){
tscError("json parse error");
tscError("json parse error:%s", json);
return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL);
}
......
......@@ -293,6 +293,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"unique result num is too large")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
......@@ -503,8 +503,8 @@ typedef struct {
uint32_t tagCondLen; // tag length in current query
int32_t colCondLen; // column length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
int16_t orderByIdx; // useless
int16_t groupOrderType; // used for group order
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query.
int64_t limit;
......
......@@ -78,8 +78,9 @@ extern "C" {
#define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MAX_NUM 39
#define TSDB_FUNC_MAX_NUM 40
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......@@ -200,6 +201,8 @@ typedef struct SQLFunctionCtx {
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
int32_t maxUniqueResult;
} SQLFunctionCtx;
typedef struct SAggFunctionInfo {
......@@ -249,7 +252,7 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
/* global sql function array */
extern struct SAggFunctionInfo aAggs[40];
extern struct SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM];
extern int32_t functionCompatList[]; // compatible check array list
......
......@@ -221,6 +221,7 @@ typedef struct SQueryAttr {
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool uniqueQuery;
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
......@@ -281,6 +282,7 @@ typedef struct SQueryAttr {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
SArray *pUdfInfo; // no need to free
int32_t maxUniqueResult;
} SQueryAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
......@@ -730,4 +732,5 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows);
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid);
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs);
#endif // TDENGINE_QEXECUTOR_H
......@@ -53,14 +53,14 @@ typedef struct tFlushoutInfo {
} tFlushoutInfo;
typedef struct tFlushoutData {
uint32_t nAllocSize;
uint32_t nLength;
tFlushoutInfo *pFlushoutInfo;
uint32_t nAllocSize; // capacity
uint32_t nLength; // size
tFlushoutInfo *pFlushoutInfo; // dynamic allocate
} tFlushoutData;
typedef struct SExtFileInfo {
uint32_t nFileSize; // in pages
uint32_t pageSize;
uint32_t nFileSize; // how many pages in file
//uint32_t pageSize; // useless
uint32_t numOfElemsInFile;
tFlushoutData flushoutData;
} SExtFileInfo;
......
......@@ -78,7 +78,7 @@ typedef struct SDiskbasedResultBuf {
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define MAX_UNIQUE_RESULT_SIZE (1000000)
/**
* create disk-based result buffer
* @param pResultBuf
......
......@@ -50,7 +50,7 @@ typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; // todo remove it
int16_t orderIndex; // order by column index
//int16_t orderIndex; // order by column index, rm useless orderIndex
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
......
......@@ -223,6 +223,21 @@ typedef struct{
SHistogramFuncBin* orderedBins;
} SHistogramFuncInfo;
typedef struct {
int64_t timestamp;
char * pTags;
} UniqueUnit;
typedef struct {
SHashObj *pSet;
int32_t num;
char res[];
} SUniqueFuncInfo;
void freeUniqueUnit(void* unit){
tfree(((UniqueUnit *)unit)->pTags);
}
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -353,6 +368,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_UNIQUE) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
......@@ -479,6 +500,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_UNIQUE) {
*type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
......@@ -5097,6 +5126,206 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
// unique use the intermediate result buffer to keep the intermediate result
static SUniqueFuncInfo *getUniqueOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage_merge is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SUniqueFuncInfo*) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
// unique
static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t type) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
char *tvp = pRes->res;
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
switch (type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
int32_t *output = (int32_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
int64_t *output = (int64_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *output = (double *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.dKey;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *output = (float *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.dKey;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *output = (int16_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
int8_t *output = (int8_t *)pCtx->pOutput;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->v.i64;
tvp += size;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
{
}
case TSDB_DATA_TYPE_NCHAR:
{
}
default: {
qError("top/bottom function not support data type:%d", pCtx->inputType);
return;
}
}
// set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = ((tValuePair *)tvp)->timestamp;
tvp += size;
}
// set the corresponding tag data for each record
// todo check malloc failure
if (pCtx->tagInfo.numOfTagCols == 0) {
return ;
}
char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES);
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
}
for (int32_t i = 0; i < len; ++i, output += step) {
int16_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], ((tValuePair *)tvp)->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
tvp += size;
}
}
tfree(pData);
}
static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
SUniqueFuncInfo *uniqueInfo = getUniqueOutputInfo(pCtx);
uniqueInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashSetFreeFp(uniqueInfo->pSet, freeUniqueUnit);
return true;
}
static void unique_function(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx);
for (int32_t i = 0; i < pCtx->size; i++) {
char *pData = GET_INPUT_DATA(pCtx, i);
TSKEY k = 0;
if (pCtx->ptsList != NULL) {
k = GET_TS_DATA(pCtx, i);
}
tValuePair *unique = taosHashGet(pInfo->pSet, pData, pCtx->inputBytes);
if (unique == NULL) {
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
tValuePair *tmp = (tValuePair *)(pInfo->res + pInfo->num * size);
if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&tmp->v, varDataVal(pData), varDataLen(pData), pCtx->inputType);
}else{
tVariantCreateFromBinary(&tmp->v, pData, 0, pCtx->inputType);
}
tmp->timestamp = k;
int32_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j];
if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
aAggs[TSDB_FUNC_TAG].xFunction(tagCtx);
memcpy(tmp->pTags + offset, tagCtx->pOutput, tagCtx->outputBytes);
offset += tagCtx->outputBytes;
}
}
taosHashPut(pInfo->pSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*));
pInfo->num++;
}else if(unique->timestamp > k){
unique->timestamp = k;
}
}
}
static void unique_function_merge(SQLFunctionCtx *pCtx) {
//SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx);
//SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx);
// the intermediate result is binary, we only use the output data type
// for (int32_t i = 0; i < pInput->num; ++i) {
// int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType;
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp,
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
// }
//
// SET_VAL(pCtx, pInput->num, pOutput->num);
//
// if (pOutput->num > 0) {
// SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// pResInfo->hasResult = DATA_SET_FLAG;
// }
}
static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx);
GET_RES_INFO(pCtx)->numOfRes = pInfo->num;
GET_TRUE_DATA_TYPE();
copyUniqueRes(pCtx, type);
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5117,11 +5346,11 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1,
// block_info,elapsed,histogram
7, 1, -1
// block_info,elapsed,histogram,unique
7, 1, -1, -1
};
SAggFunctionInfo aAggs[40] = {{
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
// 0, count function does not invoke the finalize function
"count",
TSDB_FUNC_COUNT,
......@@ -5591,5 +5820,17 @@ SAggFunctionInfo aAggs[40] = {{
histogram_func_finalizer,
histogram_func_merge,
dataBlockRequired,
},
{
// 39
"unique",
TSDB_FUNC_UNIQUE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_SELECTIVITY,
unique_function_setup,
unique_function,
unique_func_finalizer,
unique_function_merge,
dataBlockRequired,
}
};
......@@ -289,9 +289,8 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
}
static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock, SQLFunctionCtx *pCtx) {
SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
size_t size = taosArrayGetSize(columnOrderList);
taosArrayDestroy(&columnOrderList);
int32_t size = pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL? 0: pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols;
if (pRuntimeEnv->pQueryAttr->interval.interval > 0) size++;
if (size <= 0) {
return;
......@@ -1004,6 +1003,13 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
}
if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){
qError("Unique result num is too large. num: %d, limit: %d",
GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult);
aAggs[functionId].xFinalize(&pCtx[k]);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
}
// restore it
pCtx[k].preAggVals.isSet = hasAggregates;
pCtx[k].pInput = start;
......@@ -1263,6 +1269,14 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
} else {
assert(0);
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){
qError("Unique result num is too large. num: %d, limit: %d",
GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult);
aAggs[functionId].xFinalize(&pCtx[k]);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE);
}
}
}
}
......@@ -1893,7 +1907,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
continue;
}
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { //ts_select ts,top(col,2)
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
} else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
......@@ -1962,6 +1976,9 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->end.key = INT64_MIN;
pCtx->startTs = INT64_MIN;
if (pCtx->functionId == TSDB_FUNC_UNIQUE){
pCtx->maxUniqueResult = pQueryAttr->maxUniqueResult;
}
pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType;
......@@ -2754,7 +2771,15 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t MIN_ROWS_PER_PAGE = 4;
if (pQueryAttr->uniqueQuery) {
int64_t rowSize = pQueryAttr->resultRowSize;
while(rowSize*pQueryAttr->maxUniqueResult > 1024*1024*100){
pQueryAttr->maxUniqueResult = pQueryAttr->maxUniqueResult >> 1u;
}
*rowsize = (int32_t)(rowSize*pQueryAttr->maxUniqueResult);
}else{
*rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
}
int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows
......@@ -3177,7 +3202,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
pRuntimeEnv->current->groupIndex);
......@@ -3671,7 +3696,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
// set the timestamp output buffer for top/bottom/diff query
int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE ||
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM) {
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} else if (fid == TSDB_FUNC_INTERP) {
assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -3742,7 +3767,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_SAMPLE ) {
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} else if (functionId == TSDB_FUNC_INTERP) {
assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -3918,6 +3943,15 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
}
}
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pExprs[i].base.functionId == TSDB_FUNC_UNIQUE) {
return true;
}
}
return false;
}
static bool hasMainOutput(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
......@@ -4002,7 +4036,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) {
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_UNIQUE) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......@@ -4071,7 +4106,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_CSUM) {
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......@@ -5511,10 +5547,6 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
}
if (pQuery->interval.interval > 0) {
if (pOrderColumns == NULL) {
pOrderColumns = taosArrayInit(1, sizeof(SColIndex));
}
SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL};
taosArrayPush(pOrderColumns, &colIndex);
}
......@@ -8753,8 +8785,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg += tListLen(param->pGroupColIndex[i].name);
}
pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->orderType = htons(pQueryMsg->orderType);
//pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->groupOrderType = htons(pQueryMsg->groupOrderType);
}
pQueryMsg->fillType = htons(pQueryMsg->fillType);
......@@ -9331,8 +9363,8 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
}
pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols;
pGroupbyExpr->orderType = pQueryMsg->orderType;
pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
pGroupbyExpr->orderType = pQueryMsg->groupOrderType;
//pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
pGroupbyExpr->columnInfo = taosArrayInit(pQueryMsg->numOfGroupCols, sizeof(SColIndex));
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
......@@ -9547,6 +9579,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->vgId = vgId;
pQueryAttr->pFilters = pFilters;
pQueryAttr->range = pQueryMsg->range;
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs);
pQueryAttr->maxUniqueResult = MAX_UNIQUE_RESULT_SIZE;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) {
......
......@@ -46,7 +46,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t p
SExtFileInfo *pFMeta = &pMemBuffer->fileMeta;
pFMeta->pageSize = DEFAULT_PAGE_SIZE;
//pFMeta->pageSize = DEFAULT_PAGE_SIZE;
pFMeta->flushoutData.nAllocSize = 4;
pFMeta->flushoutData.nLength = 0;
......
......@@ -77,7 +77,7 @@ static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode**
pGroupbyExpr->tableIndex = p->tableIndex;
pGroupbyExpr->orderType = p->orderType;
pGroupbyExpr->orderIndex = p->orderIndex;
//pGroupbyExpr->orderIndex = p->orderIndex;
pGroupbyExpr->numOfGroupCols = p->numOfGroupCols;
pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo);
pNode->pExtInfo = pGroupbyExpr;
......
......@@ -20,7 +20,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->inMemPages = inMemBufSize/pagesize + 1; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1;
pResBuf->comp = true;
pResBuf->file = NULL;
......@@ -28,7 +28,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa
pResBuf->fileSize = 0;
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
// assert(inMemBufSize >= pagesize * 2);
pResBuf->lruList = tdListNew(POINTER_BYTES);
......@@ -257,7 +257,7 @@ static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) {
int32_t prev = pResultBuf->inMemPages;
// increase by 50% of previous mem pages
pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f);
pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f) + 1; // if pResultBuf->inMemPages == 1, *1.5 always == 1
qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize);
......
......@@ -44,6 +44,9 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
}
}
if (pQueryAttr->uniqueQuery){
return pQueryAttr->maxUniqueResult;
}
return 1;
}
......@@ -154,7 +157,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resType;
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes;
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
memset(s, 0, size);
......@@ -217,7 +220,6 @@ SResultRow* getNewResultRow(SResultRowPool* p) {
}
p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock;
initResultRow(ptr);
return ptr;
}
......@@ -451,9 +453,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
}
}
void
orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder;
......
......@@ -299,6 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE, "Unique result num is too large")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册