提交 f791d482 编写于 作者: wmmhello's avatar wmmhello

finish basic function for tail

上级 26ebf798
......@@ -3283,22 +3283,23 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd),
resultSize, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
if (functionId == TSDB_FUNC_TAIL){
int64_t offset = 0;
if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3){
tSqlExprItem* para = taosArrayGet(pItem->pNode->Expr.paramList, 2);
if (para->pNode->tokenId == TK_ID || para->pNode->value.nType != TSDB_DATA_TYPE_BIGINT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
pVariant = &para->pNode->value;
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
int64_t offset = GET_INT64_VAL(val);
offset = para->pNode->value.i64;
if (offset < 0 || offset > 100) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg30);
}
}else{
GET_INT64_VAL(val) = 0;
}
GET_INT64_VAL(val) = numRowsSelected + offset;
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
GET_INT64_VAL(val) = offset;
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}else{
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t));
}
}
......
......@@ -5120,7 +5120,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
}
}
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pQueryAttr->pExpr1);
pQueryAttr->uniqueQuery = isFunctionQuery(numOfOutput, pQueryAttr->pExpr1, TSDB_FUNC_UNIQUE);
pQueryAttr->tailQuery = isFunctionQuery(numOfOutput, pQueryAttr->pExpr1, TSDB_FUNC_TAIL);
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
......
......@@ -224,6 +224,7 @@ typedef struct SQueryAttr {
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool uniqueQuery;
bool tailQuery;
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
......@@ -734,5 +735,5 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows);
// tsdb scan table callback table or query is over. param is SQueryRuntimeEnv*
bool qReadOverCB(void* param, int8_t type, int32_t tid);
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs);
bool isFunctionQuery(int32_t numOfOutput, SExprInfo* pExprs, int16_t functionId);
#endif // TDENGINE_QEXECUTOR_H
......@@ -5467,10 +5467,10 @@ static int32_t tailComparFn(const void *p1, const void *p2, const void *param) {
static void tailSwapFn(void *dst, void *src, const void *param)
{
tValuePair **vdst = (tValuePair **) dst;
tValuePair **vsrc = (tValuePair **) src;
TailUnit **vdst = (TailUnit **) dst;
TailUnit **vsrc = (TailUnit **) src;
tValuePair *tmp = *vdst;
TailUnit *tmp = *vdst;
*vdst = *vsrc;
*vsrc = tmp;
}
......@@ -5485,7 +5485,7 @@ static void do_tail_function_add(STailInfo *pInfo, int32_t maxLen, void *pData,
taosheapsort((void *) pList, sizeof(TailUnit **), pInfo->num + 1, NULL, tailComparFn, NULL, tailSwapFn, 0);
pInfo->num++;
} else {
} else if(pList[0]->timestamp < ts) {
valueTailAssign(pList[0], bytes, pData, ts, pTagInfo, pTags, stage);
taosheapadjust((void *) pList, sizeof(TailUnit **), 0, maxLen - 1, NULL, tailComparFn, NULL, tailSwapFn, 0);
}
......@@ -5504,36 +5504,36 @@ static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
static void tail_function(SQLFunctionCtx *pCtx) {
STailInfo *pRes = getOutputInfo(pCtx);
if (pCtx->stableQuery){
// if (pCtx->stableQuery){
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_tail_function_add(pRes, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), data, ts,
do_tail_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts,
pCtx->inputBytes, &pCtx->tagInfo, NULL, pCtx->currentStage);
}
}else{
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
if (pRes->offset++ < (int32_t)pCtx->param[1].i64){
continue;
}
if (pRes->num >= (int32_t)pCtx->param[0].i64){ // query complete
pCtx->resultInfo->complete = true;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j];
ctx->resultInfo->complete = true;
}
break;
}
char *data = GET_INPUT_DATA(pCtx, i);
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage);
pRes->num++;
}
}
// }else{
// for (int32_t i = pCtx->size - 1; i >= 0; --i) {
// if (pRes->offset++ < (int32_t)pCtx->param[1].i64){
// continue;
// }
// if (pRes->num >= (int32_t)(pCtx->param[0].i64 - pCtx->param[1].i64)){ // query complete
// pCtx->resultInfo->complete = true;
// for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
// SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j];
// ctx->resultInfo->complete = true;
// }
// break;
// }
// char *data = GET_INPUT_DATA(pCtx, i);
//
// TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
//
// valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage);
//
// pRes->num++;
// }
// }
// treat the result as only one result
GET_RES_INFO(pCtx)->numOfRes = 1;
......@@ -5549,7 +5549,7 @@ static void tail_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
do_tail_function_add(pOutput, (int32_t)(pCtx->param[0].i64 + pCtx->param[1].i64), pInput->res[i]->data, pInput->res[i]->timestamp,
do_tail_function_add(pOutput, (int32_t)pCtx->param[0].i64, pInput->res[i]->data, pInput->res[i]->timestamp,
pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage);
}
......@@ -5562,34 +5562,25 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
// data in temporary list is less than the required number of results, not enough qualified number of results
STailInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
if(pCtx->stableQuery){
GET_RES_INFO(pCtx)->numOfRes = pRes->num - pCtx->param[1].i64;
}else{
GET_RES_INFO(pCtx)->numOfRes = pRes->num;
}
int32_t bytes = 0;
int32_t type = 0;
int32_t start = 0;
if (pCtx->currentStage == MERGE_STAGE) {
bytes = pCtx->outputBytes;
type = pCtx->outputType;
start = pCtx->param[1].i64;
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
} else {
bytes = pCtx->inputBytes;
type = pCtx->inputType;
}
SortSupporter support = {0};
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[2].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
support.dataOffset = 0;
support.comparFn = compareInt64Val;
} else{
support.dataOffset = sizeof(int64_t);
support.comparFn = getComparFunc(type, 0);
}
// if(pCtx->stableQuery){
GET_RES_INFO(pCtx)->numOfRes = pRes->num - pCtx->param[1].i64;
// }else{
// GET_RES_INFO(pCtx)->numOfRes = pRes->num;
// }
if (GET_RES_INFO(pCtx)->numOfRes <= 0) return;
taosqsort(pRes->res, pRes->num, POINTER_BYTES, NULL, tailComparFn);
size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen;
void *data = calloc(size, GET_RES_INFO(pCtx)->numOfRes);
......@@ -5597,10 +5588,18 @@ static void tail_func_finalizer(SQLFunctionCtx *pCtx) {
qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes);
return;
}
for(int32_t i = 0; start < pRes->num; start++, i++){
memcpy(data + i * size, pRes->res[start], size);
for(int32_t i = 0; i < GET_RES_INFO(pCtx)->numOfRes; i++){
memcpy(data + i * size, pRes->res[i], size);
}
SortSupporter support = {0};
// user specify the order of output by sort the result according to timestamp
if (pCtx->param[2].i64 != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
support.dataOffset = sizeof(int64_t);
support.comparFn = getComparFunc(type, 0);
taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn);
}
taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn);
copyRes(pCtx, data, bytes);
free(data);
doFinalizer(pCtx);
......
......@@ -2602,7 +2602,7 @@ static bool onlyOneQueryType(SQueryAttr *pQueryAttr, int32_t functId, int32_t fu
static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_FIRST, TSDB_FUNC_FIRST_DST); }
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST) || onlyOneQueryType(pQueryAttr, TSDB_FUNC_TAIL, TSDB_FUNC_TAIL); }
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
......@@ -3941,9 +3941,9 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
}
}
bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs) {
bool isFunctionQuery(int32_t numOfOutput, SExprInfo* pExprs, int16_t functionId) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pExprs[i].base.functionId == TSDB_FUNC_UNIQUE) {
if (pExprs[i].base.functionId == functionId) {
return true;
}
}
......@@ -9587,7 +9587,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->vgId = vgId;
pQueryAttr->pFilters = pFilters;
pQueryAttr->range = pQueryMsg->range;
pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs);
pQueryAttr->uniqueQuery = isFunctionQuery(numOfOutput, pExprs, TSDB_FUNC_UNIQUE);
pQueryAttr->tailQuery = isFunctionQuery(numOfOutput, pExprs, TSDB_FUNC_TAIL);
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) {
......
......@@ -38,12 +38,10 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) {
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
}
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) {
return (int32_t)(pQueryAttr->pExpr1[i].base.param[0].i64 + pQueryAttr->pExpr1[i].base.param[1].i64);
}
}
if (pQueryAttr->uniqueQuery){
return MAX_UNIQUE_RESULT_ROWS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册