提交 182936c4 编写于 作者: Y yihaoDeng

[TD-2570]<feature> support state window

上级 ff3a4c64
......@@ -226,7 +226,7 @@ typedef struct SQueryInfo {
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
bool windowState; // window state or not
bool stateWindow; // window state or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
......
......@@ -89,7 +89,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateWindowStateNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
......@@ -829,34 +829,58 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
static int32_t validateWindowStateNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
const char* msg2 = "invalid window state";
const char* msg3 = "not support window_state on super table";
const char* msg3 = "not support state window on super table/tag column";
const char* msg4 = "not support state_window with group by ";
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
if (col->z == NULL || col->n <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
pQueryInfo->groupbyExpr.numOfGroupCols = 1;
//TODO(dengyihao): check tag column
if (isStable) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSqlExprNumOfExprs(pQueryInfo) == 1) {
SSqlExpr* pExpr = &(tscSqlExprGet(pQueryInfo, 0)->base);
if (index.columnIndex == pExpr->colInfo.colIndex && pExpr->colType == TSDB_DATA_TYPE_INT) {
pQueryInfo->windowState = true;
return TSDB_CODE_SUCCESS;
}
}
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SSqlGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
pQueryInfo->stateWindow = true;
return TSDB_CODE_SUCCESS;
}
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
......@@ -7319,7 +7343,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (validateWindowStateNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// set order by info
......@@ -7344,7 +7369,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// parse the window_state
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
......
......@@ -875,6 +875,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen);
......
......@@ -3545,12 +3545,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
pQueryAttr->windowState = pQueryInfo->windowState;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
......@@ -3558,8 +3559,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
......
......@@ -471,6 +471,7 @@ typedef struct {
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
STimeWindow window;
int32_t numOfTables;
......
......@@ -196,7 +196,7 @@ typedef struct SQueryAttr {
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool windowState; // window State on sub/normal table
bool stateWindow; // window State on sub/normal table
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
......
......@@ -3298,8 +3298,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
if (pCtx->numOfParams == 2) {
return;
}
if (pCtx->param[0].i64 == 1) {
SET_VAL(pCtx, pCtx->size, 1);
} else {
INC_INIT_VAL(pCtx, pCtx->size);
}
INC_INIT_VAL(pCtx, pCtx->size);
char *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->order == TSDB_ORDER_ASC) {
......
......@@ -190,7 +190,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *bInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
......@@ -727,7 +727,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
......@@ -1295,7 +1294,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1336,7 +1335,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
//pInfo->start = j;
} else { // start a new session window
SResultRow* pResult = NULL;
......@@ -1387,12 +1386,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
}
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo;
SQLFunctionCtx *pCtx = pInfo->binfo.pCtx;
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SQLFunctionCtx *pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
......@@ -3143,7 +3142,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput;
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0) {
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// for each group result, call the finalize function for each column
if (pQueryAttr->groupbyColumn) {
closeAllResultRows(pResultRowInfo);
......@@ -5063,37 +5062,37 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_TIMESTAMP) {
qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
return;
}
pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
continue;
}
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (0 == memcmp(pInfo->prevData, val, bytes)) {
pInfo->start = j;
} else if (memcmp(pInfo->prevData, val, bytes) == 0) {
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
pInfo->start = j;
} else {
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
......@@ -5102,7 +5101,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
......@@ -5111,7 +5109,30 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
//if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
// if (pInfo->prevData == NULL) {
// pInfo->prevData = malloc(bytes);
// }
// pInfo->curWindow.skey = tsList[j];
// pInfo->curWindow.ekey = tsList[j];
// memcpy(pInfo->prevData, val, bytes);
// int32_t ret =
// setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, (char *)&v, type, bytes, item->groupIndex);
// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
// }
//}
//for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
// pInfo->binfo.pCtx[k].size = 1;
// int32_t functionId = pInfo->binfo.pCtx[k].functionId;
// if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
// aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j);
// }
//}
}
SResultRow* pResult = NULL;
......@@ -5124,25 +5145,28 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream;
......@@ -5153,7 +5177,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
}
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
if (pWindowInfo->colIndex == -1) {
pWindowInfo->colIndex = pOperator->pExpr->base.colInfo.colIndex;
pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
......@@ -6965,6 +6989,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
......
......@@ -113,6 +113,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_SessionWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
......@@ -121,10 +129,6 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
op = OP_MultiTableAggregate;
} else {
if (pQueryAttr->windowState) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
}
op = OP_Aggregate;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册