未验证 提交 5e473650 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10451 from taosdata/feature/TD-11214

Feature/td 11214
...@@ -650,48 +650,49 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -650,48 +650,49 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
} }
for(int32_t i = 0; i < pBlock->info.rows; ++i) { for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) { if (!pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
} else { continue;
doFinalizeResultImpl(pInfo, pCtx, numOfExpr); }
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
continue;
}
pInfo->binfo.pRes->info.rows += numOfRows; doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
for(int32_t j = 0; j < numOfExpr; ++j) { int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM ||
pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE) {
if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput;
}
}
for(int32_t j = 0; j < numOfExpr; ++j) { pInfo->binfo.pRes->info.rows += numOfRows;
if (pCtx[j].functionId < 0) {
continue;
}
{
assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId));
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
}
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM ||
pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE ||
pCtx[j].functionId == TSDB_FUNC_TAIL) {
if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput;
}
}
for(int32_t j = 0; j < numOfExpr; ++j) {
if (pCtx[j].functionId < 0) {
continue;
}
{
assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId));
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} }
} else {
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
} }
doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr);
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
} }
{ for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { pCtx[i].pInput = addrPtr[i];
pCtx[i].pInput = addrPtr[i];
}
} }
tfree(addrPtr); tfree(addrPtr);
...@@ -899,6 +900,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -899,6 +900,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SMultiwayMergeInfo *pAggInfo = pOperator->info; SMultiwayMergeInfo *pAggInfo = pOperator->info;
SOperatorInfo *upstream = pOperator->upstream[0]; SOperatorInfo *upstream = pOperator->upstream[0];
SQueryAttr *pQueryAttr = pOperator->pRuntimeEnv->pQueryAttr;
*newgroup = false; *newgroup = false;
bool handleData = false; bool handleData = false;
...@@ -909,7 +911,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -909,7 +911,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
pAggInfo->hasPrev = false; // now we start from a new group data set. pAggInfo->hasPrev = false; // now we start from a new group data set.
// not belongs to the same group, return the result of current group; // not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, pQueryAttr->order.order);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true);
{ // reset output buffer { // reset output buffer
...@@ -955,13 +957,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -955,13 +957,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
*newgroup = true; *newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock; pAggInfo->pExistBlock = pBlock;
savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
break; break;
} }
} }
// not belongs to the same group, return the result of current group // not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock);
......
...@@ -1072,7 +1072,7 @@ static bool isTopBottomUniqueQuery(SQueryInfo* pQueryInfo) { ...@@ -1072,7 +1072,7 @@ static bool isTopBottomUniqueQuery(SQueryInfo* pQueryInfo) {
int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId; int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM
|| functionId == TSDB_FUNC_UNIQUE) { || functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) {
return true; return true;
} }
} }
...@@ -2643,8 +2643,8 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) { ...@@ -2643,8 +2643,8 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) {
} }
pExpr->base.numOfParams = 1; pExpr->base.numOfParams = 1;
pExpr->base.param->i64 = TSDB_ORDER_ASC; pExpr->base.param[0].i64 = TSDB_ORDER_ASC;
pExpr->base.param->nType = TSDB_DATA_TYPE_INT; pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
} }
} }
} }
...@@ -2693,7 +2693,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2693,7 +2693,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg26 = "start param cannot be 0 with 'log_bin'"; const char* msg26 = "start param cannot be 0 with 'log_bin'";
const char* msg27 = "factor param cannot be negative or equal to 0/1"; const char* msg27 = "factor param cannot be negative or equal to 0/1";
const char* msg28 = "the second paramter of diff should be 0 or 1"; const char* msg28 = "the second paramter of diff should be 0 or 1";
const char* msg29 = "key timestamp column cannot be used to unique/mode function"; const char* msg29 = "key timestamp column cannot be used to unique/mode/tail function";
const char* msg30 = "offset is out of range [0, 100]";
switch (functionId) { switch (functionId) {
case TSDB_FUNC_COUNT: { case TSDB_FUNC_COUNT: {
...@@ -2853,7 +2854,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2853,7 +2854,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type // 2. check if sql function can be applied on this column data type
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){ if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
} else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) { } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
...@@ -3105,12 +3107,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3105,12 +3107,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_SAMPLE: case TSDB_FUNC_SAMPLE:
case TSDB_FUNC_PERCT: case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: case TSDB_FUNC_APERCT:
case TSDB_FUNC_UNIQUE: { case TSDB_FUNC_UNIQUE:
case TSDB_FUNC_TAIL: {
// 1. valid the number of parameters // 1. valid the number of parameters
bool valid = true; bool valid = true;
if (pItem->pNode->Expr.paramList == NULL) { if (pItem->pNode->Expr.paramList == NULL) {
valid = false; valid = false;
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT || functionId == TSDB_FUNC_TAIL) {
size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList); size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList);
if (cnt != 2 && cnt != 3) valid = false; if (cnt != 2 && cnt != 3) valid = false;
} else if (functionId == TSDB_FUNC_UNIQUE) { } else if (functionId == TSDB_FUNC_UNIQUE) {
...@@ -3136,20 +3139,22 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3136,20 +3139,22 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && functionId == TSDB_FUNC_UNIQUE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pSchema->type == TSDB_DATA_TYPE_TIMESTAMP &&
(functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29);
}
// functions can not be applied to tags // functions can not be applied to tags
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
// 2. valid the column type // 2. valid the column type
if (functionId != TSDB_FUNC_SAMPLE && functionId != TSDB_FUNC_UNIQUE && !IS_NUMERIC_TYPE(pSchema->type)) { if (functionId != TSDB_FUNC_SAMPLE && functionId != TSDB_FUNC_UNIQUE && functionId != TSDB_FUNC_TAIL && !IS_NUMERIC_TYPE(pSchema->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
...@@ -3258,13 +3263,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3258,13 +3263,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else { } else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
int64_t numRowsSelected = GET_INT32_VAL(val); int64_t numRowsSelected = GET_INT64_VAL(val);
if (functionId != TSDB_FUNC_UNIQUE && (numRowsSelected <= 0 || numRowsSelected > 100)) { // todo use macro if (functionId != TSDB_FUNC_UNIQUE && (numRowsSelected <= 0 || numRowsSelected > 100)) { // todo use macro
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12);
} }
if(functionId == TSDB_FUNC_UNIQUE){ if(functionId == TSDB_FUNC_UNIQUE){
GET_INT32_VAL(val) = MAX_UNIQUE_RESULT_ROWS; GET_INT64_VAL(val) = MAX_UNIQUE_RESULT_ROWS;
} }
// todo REFACTOR // todo REFACTOR
// set the first column ts for top/bottom query // set the first column ts for top/bottom query
...@@ -3281,7 +3286,25 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3281,7 +3286,25 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
resultSize, false); resultSize, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); if (functionId == TSDB_FUNC_TAIL){
int64_t offset = 0;
if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3){
tSqlExprItem* para = taosArrayGet(pItem->pNode->Expr.paramList, 2);
if (para->pNode->tokenId == TK_ID || para->pNode->value.nType != TSDB_DATA_TYPE_BIGINT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
offset = para->pNode->value.i64;
if (offset < 0 || offset > 100) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg30);
}
}
GET_INT64_VAL(val) = numRowsSelected + offset;
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
GET_INT64_VAL(val) = offset;
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}else{
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}
} }
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
...@@ -4010,7 +4033,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -4010,7 +4033,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_ELAPSED) ||
(functionId == TSDB_FUNC_HISTOGRAM) || (functionId == TSDB_FUNC_HISTOGRAM) ||
(functionId == TSDB_FUNC_UNIQUE) || (functionId == TSDB_FUNC_UNIQUE) ||
(functionId == TSDB_FUNC_MODE)) { (functionId == TSDB_FUNC_MODE) ||
(functionId == TSDB_FUNC_TAIL)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
...@@ -6686,7 +6710,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -6686,7 +6710,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
const char* msg6 = "only primary timestamp allowed as the second order column"; const char* msg6 = "only primary timestamp allowed as the second order column";
const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column"; const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column";
const char* msg8 = "only column in groupby clause allowed as order column"; const char* msg8 = "only column in groupby clause allowed as order column";
const char* msg9 = "orderby column must projected in subquery";
const char* msg10 = "not support distinct mixed with order by"; const char* msg10 = "not support distinct mixed with order by";
const char* msg11 = "not support order with udf"; const char* msg11 = "not support order with udf";
const char* msg12 = "order by tags not supported with diff/derivative/csum/mavg"; const char* msg12 = "order by tags not supported with diff/derivative/csum/mavg";
...@@ -6819,87 +6842,48 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -6819,87 +6842,48 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg3); return invalidOperationMsg(pMsgBuf, msg3);
} }
size_t s = taosArrayGetSize(pSortOrder); if (orderByTags) {
if (s == 1) { if (tscIsDiffDerivLikeQuery(pQueryInfo)) {
if (orderByTags) { return invalidOperationMsg(pMsgBuf, msg12);
if (tscIsDiffDerivLikeQuery(pQueryInfo)) { }
return invalidOperationMsg(pMsgBuf, msg12); //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
}
//pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
} else if (orderByGroupbyCol) {
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} else if (isTopBottomUniqueQuery(pQueryInfo)) {
/* order of top/bottom query in interval is not valid */
int32_t pos = tscExprTopBottomIndex(pQueryInfo);
assert(pos > 0);
SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1);
assert(pExpr->base.functionId == TSDB_FUNC_TS);
pExpr = tscExprGet(pQueryInfo, pos); pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
} else if (orderByGroupbyCol) {
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
return invalidOperationMsg(pMsgBuf, msg5); if (udf) {
} return invalidOperationMsg(pMsgBuf, msg11);
}
} else if (isTopBottomUniqueQuery(pQueryInfo)) {
/* order of top/bottom query in interval is not valid */
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); int32_t pos = tscExprTopBottomIndex(pQueryInfo);
pQueryInfo->order.order = p1->sortOrder; assert(pos > 0);
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1);
return TSDB_CODE_SUCCESS; assert(pExpr->base.functionId == TSDB_FUNC_TS);
} else {
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
if (udf) { pExpr = tscExprGet(pQueryInfo, pos);
return invalidOperationMsg(pMsgBuf, msg11);
}
pQueryInfo->order.order = p1->sortOrder; if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; return invalidOperationMsg(pMsgBuf, msg5);
// orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool found = false;
for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
found = true;
break;
}
}
if (!found && pQueryInfo->pDownstream) {
return invalidOperationMsg(pMsgBuf, msg9);
}
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
}
} }
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
} else { } else {
pItem = taosArrayGet(pSqlNode->pSortOrder, 0); if (udf) {
if (orderByTags) { return invalidOperationMsg(pMsgBuf, msg11);
//pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
} else if (orderByGroupbyCol){
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = index.columnIndex;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} else {
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
} }
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
// orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
}
}
if(taosArrayGetSize(pSortOrder) == 2){
SStrToken cname = {0}; SStrToken cname = {0};
pItem = taosArrayGet(pSqlNode->pSortOrder, 1); pItem = taosArrayGet(pSqlNode->pSortOrder, 1);
if (pItem->isJsonExp){ if (pItem->isJsonExp){
...@@ -6918,12 +6902,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -6918,12 +6902,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(pMsgBuf, msg6); return invalidOperationMsg(pMsgBuf, msg6);
} else {
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} }
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} }
} else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsgBuf, msg1); return invalidOperationMsg(pMsgBuf, msg1);
...@@ -6945,13 +6927,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -6945,13 +6927,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg11); return invalidOperationMsg(pMsgBuf, msg11);
} }
if (udf) { pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
return invalidOperationMsg(pMsgBuf, msg11);
}
CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
//pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
} }
if (isTopBottomUniqueQuery(pQueryInfo)) { if (isTopBottomUniqueQuery(pQueryInfo)) {
...@@ -6974,19 +6950,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -6974,19 +6950,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg5); return invalidOperationMsg(pMsgBuf, msg5);
} }
} }
pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
return TSDB_CODE_SUCCESS;
} }
if (udf) { if (udf) {
return invalidOperationMsg(pMsgBuf, msg11); return invalidOperationMsg(pMsgBuf, msg11);
} }
pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
} else { } else {
...@@ -7023,7 +6992,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -7023,7 +6992,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
} }
} }
pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
} }
...@@ -8485,7 +8453,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* ...@@ -8485,7 +8453,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF && if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF &&
f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE && f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE &&
f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ && f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ &&
f != TSDB_FUNC_UNIQUE) { f != TSDB_FUNC_UNIQUE && f != TSDB_FUNC_TAIL) {
return invalidOperationMsg(msg, msg1); return invalidOperationMsg(msg, msg1);
} }
...@@ -10089,7 +10057,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -10089,7 +10057,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column"; const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg5 = "only tag query not compatible with normal column filter"; const char* msg5 = "only tag query not compatible with normal column filter";
const char* msg6 = "not support stddev/percentile in the outer query yet"; const char* msg6 = "not support stddev/percentile in the outer query yet";
const char* msg7 = "derivative/twa/rate/irate/diff requires timestamp column exists in subquery"; const char* msg7 = "derivative/twa/rate/irate/diff/tail requires timestamp column exists in subquery";
const char* msg8 = "condition missing for join query"; const char* msg8 = "condition missing for join query";
const char* msg9 = "not support 3 level select"; const char* msg9 = "not support 3 level select";
...@@ -10158,12 +10126,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -10158,12 +10126,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) { if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
if ((timeWindowQuery || pQueryInfo->stateWindow) && f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = TSDB_ORDER_ASC;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
}
} }
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
...@@ -10177,7 +10139,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -10177,7 +10139,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE ||
f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF) { f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_TAIL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
} }
......
...@@ -690,7 +690,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { ...@@ -690,7 +690,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
functionId == TSDB_FUNC_TS_COMP || functionId == TSDB_FUNC_TS_COMP ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_HISTOGRAM || functionId == TSDB_FUNC_HISTOGRAM ||
functionId == TSDB_FUNC_UNIQUE)) { functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL)) {
return true; return true;
} }
} }
...@@ -2659,7 +2660,7 @@ int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo){ ...@@ -2659,7 +2660,7 @@ int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo){
if (pExpr == NULL) if (pExpr == NULL)
continue; continue;
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM
|| pExpr->base.functionId == TSDB_FUNC_UNIQUE) { || pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_TAIL) {
return i; return i;
} }
} }
...@@ -4938,7 +4939,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -4938,7 +4939,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
pse->colType = pExpr->base.resType; pse->colType = pExpr->base.resType;
if(pExpr->base.resBytes > INT16_MAX && if(pExpr->base.resBytes > INT16_MAX &&
(pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE)){ (pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE
|| pExpr->base.functionId == TSDB_FUNC_TAIL)){
pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; pQueryAttr->interBytesForGlobal = pExpr->base.resBytes;
}else{ }else{
pse->colBytes = pExpr->base.resBytes; pse->colBytes = pExpr->base.resBytes;
...@@ -5118,8 +5120,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -5118,8 +5120,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
} }
} }
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pQueryAttr->pExpr1);
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo)); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
......
...@@ -80,8 +80,9 @@ extern "C" { ...@@ -80,8 +80,9 @@ extern "C" {
#define TSDB_FUNC_HISTOGRAM 38 #define TSDB_FUNC_HISTOGRAM 38
#define TSDB_FUNC_UNIQUE 39 #define TSDB_FUNC_UNIQUE 39
#define TSDB_FUNC_MODE 40 #define TSDB_FUNC_MODE 40
#define TSDB_FUNC_TAIL 41
#define TSDB_FUNC_MAX_NUM 41 #define TSDB_FUNC_MAX_NUM 42
#define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
...@@ -223,7 +223,6 @@ typedef struct SQueryAttr { ...@@ -223,7 +223,6 @@ typedef struct SQueryAttr {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool uniqueQuery;
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
...@@ -734,5 +733,4 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows); ...@@ -734,5 +733,4 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows);
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv* // tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid); bool qReadOverCB(void* param, int8_t type, int32_t tid);
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs);
#endif // TDENGINE_QEXECUTOR_H #endif // TDENGINE_QEXECUTOR_H
...@@ -243,6 +243,16 @@ typedef struct { ...@@ -243,6 +243,16 @@ typedef struct {
char res[]; char res[];
} SModeFuncInfo; } SModeFuncInfo;
typedef struct {
int64_t timestamp;
char data[];
} TailUnit;
typedef struct STailInfo {
int32_t num;
TailUnit **res;
} STailInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) { if (!isValidDataType(dataType)) {
...@@ -387,17 +397,23 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -387,17 +397,23 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_MODE) { } else if (functionId == TSDB_FUNC_MODE) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
int64_t size = sizeof(ModeUnit) + dataBytes; int64_t size = sizeof(ModeUnit) + dataBytes;
size *= MAX_MODE_INNER_RESULT_ROWS; size *= MAX_MODE_INNER_RESULT_ROWS;
size += sizeof(SModeFuncInfo); size += sizeof(SModeFuncInfo);
if (size > MAX_MODE_INNER_RESULT_SIZE){ if (size > MAX_MODE_INNER_RESULT_SIZE){
size = MAX_MODE_INNER_RESULT_SIZE; size = MAX_MODE_INNER_RESULT_SIZE;
} }
*bytes = (int32_t)size; *bytes = (int32_t)size;
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TAIL) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) { } else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param); *bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param);
...@@ -521,7 +537,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -521,7 +537,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = (int16_t)dataType; *type = (int16_t)dataType;
*bytes = dataBytes; *bytes = dataBytes;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + extLength) * param; size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo) // the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size; *interBytes = (int32_t)size;
...@@ -545,8 +561,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -545,8 +561,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size = MAX_MODE_INNER_RESULT_SIZE; size = MAX_MODE_INNER_RESULT_SIZE;
} }
*interBytes = (int32_t)size; *interBytes = (int32_t)size;
return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TAIL) {
}else if (functionId == TSDB_FUNC_SAMPLE) { *type = (int16_t)dataType;
*bytes = dataBytes;
size_t size = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param);
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType; *type = (int16_t)dataType;
*bytes = dataBytes; *bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param; size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
...@@ -945,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ ...@@ -945,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_
} }
} }
static int32_t tailFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
// not initialized yet, it is the first block, load it.
if (pCtx->pOutput == NULL) {
return BLK_DATA_ALL_NEEDED;
}
// the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is
// the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid
STailInfo *pInfo = (STailInfo*) (pCtx->pOutput);
TailUnit **pList = pInfo->res;
if (pInfo->num >= pCtx->param[0].i64 && pList[0]->timestamp > w->ekey){
return BLK_DATA_NO_NEEDED;
} else {
return BLK_DATA_ALL_NEEDED;
}
}
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* The intermediate result of average is kept in the interResultBuf. * The intermediate result of average is kept in the interResultBuf.
...@@ -2506,11 +2546,11 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2506,11 +2546,11 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
tValuePair **tvp = pRes->res; tValuePair **tvp = pRes->res;
// user specify the order of output by sort the result according to timestamp // user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pCtx->param[2].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; __compar_fn_t comparator = (pCtx->param[3].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ { } else /*if (pCtx->param[2].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ {
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; __compar_fn_t comparator = (pCtx->param[3].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} }
...@@ -5116,21 +5156,18 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5116,21 +5156,18 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
// unique // unique&tail copy
static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { static void copyRes(SQLFunctionCtx *pCtx, void *data, int32_t bytes) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen;
SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
char *tsOutput = pCtx->ptsOutputBuf; char *tsOutput = pCtx->ptsOutputBuf;
char *output = pCtx->pOutput; char *output = pCtx->pOutput;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[2].i64); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[3].i64);
char *tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); char *tvp = data + (size * ((pCtx->param[3].i64 == TSDB_ORDER_ASC) ? 0 : len -1));
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
memcpy(tsOutput, tvp, sizeof(int64_t)); memcpy(tsOutput, tvp, sizeof(int64_t));
memcpy(output, tvp + sizeof(UniqueUnit), bytes); memcpy(output, tvp + sizeof(int64_t), bytes);
tvp += (step * size); tvp += (step * size);
tsOutput += sizeof(int64_t); tsOutput += sizeof(int64_t);
output += bytes; output += bytes;
...@@ -5147,9 +5184,9 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { ...@@ -5147,9 +5184,9 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
} }
tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); tvp = data + (size * ((pCtx->param[3].i64 == TSDB_ORDER_ASC) ? 0 : len -1));
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
int32_t offset = (int32_t)sizeof(UniqueUnit) + bytes; int32_t offset = (int32_t)sizeof(int64_t) + bytes;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
...@@ -5257,10 +5294,10 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) { ...@@ -5257,10 +5294,10 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) {
typedef struct{ typedef struct{
int32_t dataOffset; int32_t dataOffset;
__compar_fn_t comparFn; __compar_fn_t comparFn;
} UiqueSupporter; } SortSupporter;
static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) { static int32_t sortCompareFn(const void *p1, const void *p2, const void *param) {
UiqueSupporter *support = (UiqueSupporter *)param; SortSupporter *support = (SortSupporter *)param;
return support->comparFn((const char*)p1 + support->dataOffset, (const char*)p2 + support->dataOffset); return support->comparFn((const char*)p1 + support->dataOffset, (const char*)p2 + support->dataOffset);
} }
...@@ -5278,19 +5315,19 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5278,19 +5315,19 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) {
bytes = pCtx->inputBytes; bytes = pCtx->inputBytes;
type = pCtx->inputType; type = pCtx->inputType;
} }
UiqueSupporter support = {0}; SortSupporter support = {0};
// user specify the order of output by sort the result according to timestamp // user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pCtx->param[2].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
support.dataOffset = 0; support.dataOffset = 0;
support.comparFn = compareInt64Val; support.comparFn = compareInt64Val;
} else{ } else{
support.dataOffset = sizeof(UniqueUnit); support.dataOffset = sizeof(int64_t);
support.comparFn = getComparFunc(type, 0); support.comparFn = getComparFunc(type, 0);
} }
size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen;
taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, uniqueCompareFn); taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn);
copyUniqueRes(pCtx, bytes); copyRes(pCtx, pInfo->res, bytes);
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -5402,6 +5439,194 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5402,6 +5439,194 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
static void buildTailStruct(STailInfo *pTailInfo, SQLFunctionCtx *pCtx) {
char *tmp = (char *)pTailInfo + sizeof(STailInfo);
pTailInfo->res = (TailUnit**) tmp;
tmp += POINTER_BYTES * pCtx->param[0].i64;
int32_t bytes = 0;
if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes;
} else {
bytes = pCtx->inputBytes;
}
size_t size = sizeof(TailUnit) + bytes + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
pTailInfo->res[i] = (TailUnit*) tmp;
tmp += size;
}
}
static void valueTailAssign(TailUnit *dst, int32_t bytes, const char *val, int64_t tsKey,
SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) {
dst->timestamp = tsKey;
memcpy(dst->data, val, bytes);
if (stage == MERGE_STAGE) {
memcpy(dst->data + bytes, pTags, (size_t)pTagInfo->tagsLen);
} else { // the tags are dumped from the ctx tag fields
int32_t size = 0;
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i];
if (ctx->functionId == TSDB_FUNC_TS_DUMMY) {
ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
ctx->tag.i64 = tsKey;
}
tVariantDump(&ctx->tag, ctx->pOutput, ctx->tag.nType, true);
memcpy(dst->data + bytes + size, ctx->pOutput, ctx->outputBytes);
size += ctx->outputBytes;
}
}
}
static int32_t tailComparFn(const void *p1, const void *p2, const void *param) {
TailUnit *d1 = *(TailUnit **) p1;
TailUnit *d2 = *(TailUnit **) p2;
return compareInt64Val(d1, d2);
}
static void tailSwapFn(void *dst, void *src, const void *param)
{
TailUnit **vdst = (TailUnit **) dst;
TailUnit **vsrc = (TailUnit **) src;
TailUnit *tmp = *vdst;
*vdst = *vsrc;
*vsrc = tmp;
}
static void do_tail_function_add(STailInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, int32_t bytes,
SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) {
TailUnit **pList = pInfo->res;
if (pInfo->num < maxLen) {
valueTailAssign(pList[pInfo->num], bytes, pData, ts, pTagInfo, pTags, stage);
taosheapsort((void *) pList, sizeof(TailUnit **), pInfo->num + 1, NULL, tailComparFn, NULL, tailSwapFn, 0);
pInfo->num++;
} else if(pList[0]->timestamp < ts) {
valueTailAssign(pList[0], bytes, pData, ts, pTagInfo, pTags, stage);
taosheapadjust((void *) pList, sizeof(TailUnit **), 0, maxLen - 1, NULL, tailComparFn, NULL, tailSwapFn, 0);
}
}
static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
STailInfo *pInfo = getOutputInfo(pCtx);
buildTailStruct(pInfo, pCtx);
return true;
}
static void tail_function(SQLFunctionCtx *pCtx) {
STailInfo *pRes = getOutputInfo(pCtx);
// if (pCtx->stableQuery){
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_tail_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts,
pCtx->inputBytes, &pCtx->tagInfo, NULL, pCtx->currentStage);
}
// }else{
// for (int32_t i = pCtx->size - 1; i >= 0; --i) {
// if (pRes->offset++ < (int32_t)pCtx->param[1].i64){
// continue;
// }
// if (pRes->num >= (int32_t)(pCtx->param[0].i64 - pCtx->param[1].i64)){ // query complete
// pCtx->resultInfo->complete = true;
// for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
// SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j];
// ctx->resultInfo->complete = true;
// }
// break;
// }
// char *data = GET_INPUT_DATA(pCtx, i);
//
// TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
//
// valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage);
//
// pRes->num++;
// }
// }
// treat the result as only one result
GET_RES_INFO(pCtx)->numOfRes = 1;
}
static void tail_func_merge(SQLFunctionCtx *pCtx) {
STailInfo *pInput = (STailInfo *)GET_INPUT_DATA_LIST(pCtx);
// construct the input data struct from binary data
buildTailStruct(pInput, pCtx);
STailInfo *pOutput = getOutputInfo(pCtx);
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
do_tail_function_add(pOutput, (int32_t)pCtx->param[0].i64, pInput->res[i]->data, pInput->res[i]->timestamp,
pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage);
}
GET_RES_INFO(pCtx)->numOfRes = pOutput->num;
}
static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// data in temporary list is less than the required number of results, not enough qualified number of results
STailInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
int32_t bytes = 0;
int32_t type = 0;
if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes;
type = pCtx->outputType;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else {
bytes = pCtx->inputBytes;
type = pCtx->inputType;
}
// if(pCtx->stableQuery){
GET_RES_INFO(pCtx)->numOfRes = pRes->num - pCtx->param[1].i64;
// }else{
// GET_RES_INFO(pCtx)->numOfRes = pRes->num;
// }
if (GET_RES_INFO(pCtx)->numOfRes <= 0) return;
taosqsort(pRes->res, pRes->num, POINTER_BYTES, NULL, tailComparFn);
size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen;
void *data = calloc(size, GET_RES_INFO(pCtx)->numOfRes);
if(!data){
qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes);
return;
}
for(int32_t i = 0; i < GET_RES_INFO(pCtx)->numOfRes; i++){
memcpy(data + i * size, pRes->res[i], size);
}
SortSupporter support = {0};
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[2].i64 != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
support.dataOffset = sizeof(int64_t);
support.comparFn = getComparFunc(type, 0);
taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn);
}
copyRes(pCtx, data, bytes);
free(data);
doFinalizer(pCtx);
}
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* function compatible list. * function compatible list.
...@@ -5422,8 +5647,8 @@ int32_t functionCompatList[] = { ...@@ -5422,8 +5647,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, // tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1, 6, 8, -1, -1, -1,
// block_info,elapsed,histogram,unique,mode // block_info,elapsed,histogram,unique,mode,tail
7, 1, -1, -1, 1 7, 1, -1, -1, 1, -1
}; };
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
...@@ -5920,5 +6145,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ ...@@ -5920,5 +6145,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
mode_func_finalizer, mode_func_finalizer,
mode_function_merge, mode_function_merge,
dataBlockRequired, dataBlockRequired,
} },
{
// 41
"tail",
TSDB_FUNC_TAIL,
TSDB_FUNC_TAIL,
TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY,
tail_function_setup,
tail_function,
tail_func_finalizer,
tail_func_merge,
tailFuncRequired,
}
}; };
...@@ -47,11 +47,6 @@ ...@@ -47,11 +47,6 @@
#define MULTI_KEY_DELIM "-" #define MULTI_KEY_DELIM "-"
#define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\
} while (0)
enum { enum {
TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TS_NOT_EQUALS = 1,
...@@ -288,48 +283,28 @@ static int compareRowData(const void *a, const void *b, const void *userData) { ...@@ -288,48 +283,28 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
} }
static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock, SQLFunctionCtx *pCtx) { static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) {
int32_t size = pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL? 0: pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols; if (pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL || pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols <= 0){
if (pRuntimeEnv->pQueryAttr->interval.interval > 0) size++;
if (size <= 0) {
return; return;
} }
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId; if (pRuntimeEnv->pQueryAttr->order.orderColId <= 0){
if (orderId <= 0) {
return; return;
} }
int32_t orderIndex = -1; SColIndex* pColIndex = taosArrayGet(pRuntimeEnv->pQueryAttr->pGroupbyExpr->columnInfo, 0);
for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) {
if (pCtx[j].colId == orderId) {
orderIndex = j;
break;
}
}
if (orderIndex < 0) {
return;
}
bool found = false;
int16_t dataOffset = 0; int16_t dataOffset = 0;
int16_t type = 0;
for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j); SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j);
if (orderIndex == j) { if (pColInfoData->info.colId == pColIndex->colId) {
found = true; type = pColInfoData->info.type;
break; break;
} }
dataOffset += pColInfoData->info.bytes; dataOffset += pColInfoData->info.bytes;
} }
if (found == false) {
return;
}
int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderIndex].base.resType;
SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)}; SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)};
taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support); taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support);
} }
...@@ -1962,7 +1937,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -1962,7 +1937,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->inputType = pSqlExpr->colType; pCtx->inputType = pSqlExpr->colType;
if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX &&
(pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE)){ (pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE
|| pSqlExpr->functionId == TSDB_FUNC_TAIL)){
pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal;
}else{ }else{
pCtx->inputBytes = pSqlExpr->colBytes; pCtx->inputBytes = pSqlExpr->colBytes;
...@@ -2001,16 +1977,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -2001,16 +1977,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
int32_t functionId = pCtx->functionId; int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM
|| functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE) { || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE
|| functionId == TSDB_FUNC_TAIL) {
int32_t f = pExpr[i-1].base.functionId; int32_t f = pExpr[i-1].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64 = pQueryAttr->order.order; pCtx->param[3].i64 = pQueryAttr->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[3].i64 = functionId;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64 = pQueryAttr->order.orderColId; pCtx->param[2].i64 = pQueryAttr->order.orderColId;
} else if (functionId == TSDB_FUNC_INTERP) { } else if (functionId == TSDB_FUNC_INTERP) {
pCtx->param[2].i64 = (int8_t)pQueryAttr->fillType; pCtx->param[2].i64 = (int8_t)pQueryAttr->fillType;
if (pQueryAttr->fillVal != NULL) { if (pQueryAttr->fillVal != NULL) {
...@@ -3187,7 +3162,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -3187,7 +3162,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
if ((*status) != BLK_DATA_ALL_NEEDED) { if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery)) {
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
...@@ -3199,7 +3174,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -3199,7 +3174,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { // stable aggregate, not interval aggregate or normal column aggregate } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
pRuntimeEnv->current->groupIndex); pRuntimeEnv->current->groupIndex);
...@@ -3698,7 +3673,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3698,7 +3673,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
int32_t fid = pCtx[i].functionId; int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE || if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE ||
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE) { fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE ||
fid == TSDB_FUNC_TAIL) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} else if (fid == TSDB_FUNC_INTERP) { } else if (fid == TSDB_FUNC_INTERP) {
assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS); assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS);
...@@ -3769,7 +3745,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3769,7 +3745,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE) { functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} else if (functionId == TSDB_FUNC_INTERP) { } else if (functionId == TSDB_FUNC_INTERP) {
assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS); assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS);
...@@ -3945,15 +3922,6 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult ...@@ -3945,15 +3922,6 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
} }
} }
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pExprs[i].base.functionId == TSDB_FUNC_UNIQUE) {
return true;
}
}
return false;
}
static bool hasMainOutput(SQueryAttr *pQueryAttr) { static bool hasMainOutput(SQueryAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
...@@ -4044,7 +4012,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -4044,7 +4012,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_UNIQUE) { functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} }
...@@ -4114,7 +4082,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF ...@@ -4114,7 +4082,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE) { functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} }
...@@ -5104,14 +5073,13 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { ...@@ -5104,14 +5073,13 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.numOfCols = pQueryAttr->numOfCols, .numOfCols = pQueryAttr->numOfCols,
.type = BLOCK_LOAD_OFFSET_SEQ_ORDER, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER,
.loadExternalRows = false, .loadExternalRows = false,
.twindow = *win,
}; };
// set offset with // set offset with
if(pQueryAttr->skipOffset) { if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset; cond.offset = pQueryAttr->limit.offset;
} }
TIME_WINDOW_COPY(cond.twindow, *win);
return cond; return cond;
} }
...@@ -5279,8 +5247,6 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -5279,8 +5247,6 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey);
pRuntimeEnv->scanFlag = REVERSE_SCAN;
pTableScanInfo->times = 1; pTableScanInfo->times = 1;
pTableScanInfo->current = 0; pTableScanInfo->current = 0;
pTableScanInfo->reverseTimes = 0; pTableScanInfo->reverseTimes = 0;
...@@ -6876,7 +6842,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6876,7 +6842,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
STableId prevId = {0, 0}; STableId prevId = {0, 0};
...@@ -6906,7 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -6906,7 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
...@@ -6927,7 +6892,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -6927,7 +6892,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type; // int16_t type = pColInfoData->info.type;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
...@@ -6939,9 +6904,9 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -6939,9 +6904,9 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pInfo->numOfRows = 0; pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j; char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) { // if (isNull(val, type)) {
continue; // continue;
} // }
if (pInfo->prevData == NULL) { if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes); pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes); memcpy(pInfo->prevData, val, bytes);
...@@ -7162,7 +7127,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -7162,7 +7127,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);
if (!pRuntimeEnv->pQueryAttr->stableQuery) { if (!pRuntimeEnv->pQueryAttr->stableQuery) {
sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx); sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
} }
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
...@@ -9006,7 +8971,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol ...@@ -9006,7 +8971,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t functId = pExprs[i].base.functionId; int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || functId == TSDB_FUNC_UNIQUE) { if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || functId == TSDB_FUNC_UNIQUE || functId == TSDB_FUNC_TAIL) {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pTableInfo->numOfCols) { if (j < 0 || j >= pTableInfo->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG; return TSDB_CODE_QRY_INVALID_MSG;
...@@ -9591,7 +9556,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -9591,7 +9556,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->vgId = vgId; pQueryAttr->vgId = vgId;
pQueryAttr->pFilters = pFilters; pQueryAttr->pFilters = pFilters;
pQueryAttr->range = pQueryMsg->range; pQueryAttr->range = pQueryMsg->range;
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs);
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) { if (pQueryAttr->tableCols == NULL) {
......
...@@ -38,12 +38,14 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo ...@@ -38,12 +38,14 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) { pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
} }
}
if (pQueryAttr->uniqueQuery){ if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_UNIQUE){
return MAX_UNIQUE_RESULT_ROWS; return MAX_UNIQUE_RESULT_ROWS;
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册