提交 ae8edc50 编写于 作者: wmmhello's avatar wmmhello

modify unique logic, add error tips & fix order by logic

上级 9dc99879
......@@ -1113,6 +1113,7 @@ static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryInfo* pQueryInfo, SSql
static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "invalid query expression";
const char* msg2 = "top/bottom query does not support order by value in time window query";
const char* msg3 = "unique function does not supportted in time window query";
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomUniqueQuery(pQueryInfo)) {
......@@ -1138,6 +1139,9 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn
if (pExpr->base.functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->base.colInfo.flag)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pExpr->base.functionId == TSDB_FUNC_UNIQUE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
/*
......@@ -2689,6 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg26 = "start param cannot be 0 with 'log_bin'";
const char* msg27 = "factor param cannot be negative or equal to 0/1";
const char* msg28 = "the second paramter of diff should be 0 or 1";
const char* msg29 = "key timestamp column cannot be used to unique function";
switch (functionId) {
case TSDB_FUNC_COUNT: {
......@@ -3128,6 +3133,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && functionId == TSDB_FUNC_UNIQUE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
......@@ -8376,6 +8384,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up";
const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg/Elapsed only support group by tbname";
const char* msg7 = "unique function does not supportted in state window query";
// only retrieve tags, group by is not supportted
if (tscQueryTags(pQueryInfo)) {
......@@ -8455,6 +8464,10 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
}
if (pQueryInfo->stateWindow && f == TSDB_FUNC_UNIQUE){
return invalidOperationMsg(msg, msg7);
}
if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF &&
f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE &&
f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ &&
......
......@@ -2162,7 +2162,7 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
switch (type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
......@@ -5147,16 +5147,16 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen;
char *tvp = pRes->res;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
char *tsOutput = pCtx->ptsOutputBuf;
char *output = pCtx->pOutput;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[2].i64);
char *tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1));
for (int32_t i = 0; i < len; ++i) {
memcpy(tsOutput, tvp, sizeof(int64_t));
memcpy(output, tvp + sizeof(UniqueUnit), bytes);
tvp += size;
tvp += (step * size);
tsOutput += sizeof(int64_t);
output += bytes;
}
......@@ -5172,7 +5172,7 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
}
tvp = pRes->res;
tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1));
for (int32_t i = 0; i < len; ++i) {
int16_t offset = sizeof(UniqueUnit) + bytes;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
......@@ -5180,7 +5180,7 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
}
tvp += size;
tvp += (step * size);
}
tfree(pData);
......@@ -5269,17 +5269,42 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes = pOutput->num;
}
typedef struct{
int32_t dataOffset;
__compar_fn_t comparFn;
} UiqueSupporter;
static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) {
UiqueSupporter *support = (UiqueSupporter *)param;
return support->comparFn(p1 + support->dataOffset, p2 + support->dataOffset);
}
static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx);
GET_RES_INFO(pCtx)->numOfRes = pInfo->num;
int32_t bytes = 0;
int32_t type = 0;
if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes;
type = pCtx->outputType;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else {
bytes = pCtx->inputBytes;
type = pCtx->inputType;
}
UiqueSupporter support = {0};
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
support.dataOffset = 0;
support.comparFn = compareInt64Val;
} else{
support.dataOffset = sizeof(UniqueUnit);
support.comparFn = getComparFunc(type, 0);
}
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen;
taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, uniqueCompareFn);
copyUniqueRes(pCtx, bytes);
doFinalizer(pCtx);
}
......
......@@ -2003,7 +2003,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
// set the order information for top/bottom query
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM
|| functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE) {
int32_t f = pExpr[i-1].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册