提交 3b6ed920 编写于 作者: wmmhello's avatar wmmhello

finish basic logic for state function

上级 e5ddcd90
......@@ -2804,7 +2804,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// no parameters or more than one parameter for function
if (pItem->pNode->Expr.paramList == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED &&
functionId != TSDB_FUNC_DIFF && numOfParams != 1) ||
functionId != TSDB_FUNC_DIFF && functionId != TSDB_FUNC_STATE_COUNT && functionId != TSDB_FUNC_STATE_DURATION && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) ||
(functionId == TSDB_FUNC_ELAPSED && numOfParams != 1 && numOfParams != 2) ||
(functionId == TSDB_FUNC_DIFF && numOfParams != 1 && numOfParams != 2) ||
......@@ -2879,8 +2879,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
// set the first column ts for diff query
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM ||
functionId == TSDB_FUNC_STATE_COUNT || functionId == TSDB_FUNC_STATE_DURATION) {
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM) {
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, 0, TSDB_KEYSIZE, false);
......@@ -2892,10 +2891,19 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
if (functionId == TSDB_FUNC_STATE_COUNT || functionId == TSDB_FUNC_STATE_DURATION) {
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_PRJ, &index, pSchema->type,
pSchema->bytes, getNewResColId(pCmd), intermediateResSize, false);
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_PRJ, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, 0, TSDB_KEYSIZE, false);
tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS_DUMMY].name, sizeof(pExpr->base.aliasName));
SColumnList ids = createColumnList(1, 0, 0);
insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS].name, pExpr);
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_PRJ, &index, pSchema->type,
pSchema->bytes, getNewResColId(pCmd), 0, false);
tstrncpy(pExpr->base.aliasName, pParamElem->pNode->columnName.z, pParamElem->pNode->columnName.n+1);
ids = createColumnList(1, index.tableIndex, index.columnIndex);
insertResultField(pQueryInfo, colIndex + 1, &ids, pExpr->base.resBytes, (int32_t)pExpr->base.resType,
pExpr->base.aliasName, pExpr);
}
......@@ -2984,7 +2992,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
tscExprAddParams(&pExpr->base, pParamElem[1].pNode->columnName.z, TSDB_DATA_TYPE_BINARY, pParamElem[1].pNode->columnName.n);
if (pParamElem[2].pNode->tokenId != TK_INTEGER || pParamElem[2].pNode->tokenId != TK_FLOAT) {
if (pParamElem[2].pNode->tokenId != TK_INTEGER && pParamElem[2].pNode->tokenId != TK_FLOAT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tVariantAssign(&pExpr->base.param[pExpr->base.numOfParams++], &pParamElem[2].pNode->value);
......@@ -8209,7 +8217,8 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
int16_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY) {
functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION) {
continue;
}
......
......@@ -273,8 +273,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION)
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP)
{
*type = (int16_t)dataType;
*bytes = dataBytes;
......@@ -646,7 +645,7 @@ bool isValidStateOper(char *oper, int32_t len){
strncmp(oper, "ne", len) || strncmp(oper, "eq", len);
}
#define STATEOPER(OPER, COMP, TYPE) if (strncmp(oper->pz, OPER, oper->nLen)) {\
#define STATEOPER(OPER, COMP, TYPE) if (strncmp(oper->pz, OPER, oper->nLen) == 0) {\
if (pVar->nType == TSDB_DATA_TYPE_BIGINT && *(TYPE)data COMP pVar->i64) return true;\
else if(pVar->nType == TSDB_DATA_TYPE_DOUBLE && *(TYPE)data COMP pVar->dKey) return true;\
else return false;}
......@@ -702,7 +701,7 @@ static bool isStateOperTrue(void *data, int16_t type, tVariant *oper, tVariant *
}
case TSDB_DATA_TYPE_UTINYINT: {
STATEJUDGE(uint16_t *)
STATEJUDGE(uint8_t *)
break;
}
default:
......@@ -5718,33 +5717,19 @@ static void state_count_function(SQLFunctionCtx *pCtx) {
SStateInfo *pStateInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
int32_t notNullElems = 0;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx);
int64_t *pOutput = (int64_t *)pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; i ++) {
for (int32_t i = 0; i < pCtx->size; i++,pOutput++,data += pCtx->inputBytes) {
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
setNull(pOutput, TSDB_DATA_TYPE_BIGINT, 0);
continue;
}
if (isStateOperTrue(data, pCtx->inputType, &pCtx->param[0], &pCtx->param[1])){
if ((pStateInfo->countPrev == 0 || pStateInfo->countPrev == -1)) {
*pOutput = 1;
pStateInfo->countPrev = 1;
}else{
*pOutput = ++pStateInfo->countPrev;
}
*pOutput = ++pStateInfo->countPrev;
}else{
*pOutput = -1;
pStateInfo->countPrev = 0;
}
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
data += i * pCtx->inputBytes;
notNullElems++;
}
for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
......@@ -5753,7 +5738,7 @@ static void state_count_function(SQLFunctionCtx *pCtx) {
aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
}
}
pResInfo->numOfRes += notNullElems;
pResInfo->numOfRes += pCtx->size;
}
static void state_duration_function(SQLFunctionCtx *pCtx) {
......@@ -5761,35 +5746,25 @@ static void state_duration_function(SQLFunctionCtx *pCtx) {
SStateInfo *pStateInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
int32_t notNullElems = 0;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx);
int64_t *pOutput = (int64_t *)pCtx->pOutput;
for (int32_t i = 0; i < pCtx->size; i ++) {
for (int32_t i = 0; i < pCtx->size; i++,pOutput++,data += pCtx->inputBytes) {
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
setNull(pOutput, TSDB_DATA_TYPE_BIGINT, 0);
continue;
}
*pTimestamp = (tsList != NULL)? tsList[i]:0;
if (isStateOperTrue(data, pCtx->inputType, &pCtx->param[0], &pCtx->param[1])){
if (pStateInfo->durationStart == 0) {
*pOutput = 0;
pStateInfo->durationStart = *pTimestamp;
pStateInfo->durationStart = tsList[i];
} else {
*pOutput = (*pTimestamp - pStateInfo->durationStart)/pCtx->param[2].i64;
*pOutput = (tsList[i] - pStateInfo->durationStart)/pCtx->param[2].i64;
}
} else{
*pOutput = -1;
pStateInfo->durationStart = 0;
}
pOutput += 1;
pTimestamp += 1;
data += i * pCtx->inputBytes;
notNullElems++;
}
for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
......@@ -5798,7 +5773,7 @@ static void state_duration_function(SQLFunctionCtx *pCtx) {
aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
}
}
pResInfo->numOfRes += notNullElems;
pResInfo->numOfRes += pCtx->size;
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
......@@ -5821,7 +5796,7 @@ int32_t functionCompatList[] = {
// tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1,
// block_info,elapsed,histogram,unique,mode,tail, stateCount, stateDuration
7, 1, -1, -1, 1, -1, -1, -1,
7, 1, -1, -1, 1, -1, 1, 1,
};
SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
......@@ -6336,7 +6311,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
"stateCount",
TSDB_FUNC_STATE_COUNT,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
function_setup,
state_count_function,
doFinalizer,
......@@ -6348,7 +6323,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{
"stateDuration",
TSDB_FUNC_STATE_DURATION,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
function_setup,
state_duration_function,
doFinalizer,
......
......@@ -3673,7 +3673,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE ||
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE ||
fid == TSDB_FUNC_TAIL || fid == TSDB_FUNC_STATE_COUNT || fid == TSDB_FUNC_STATE_DURATION) {
fid == TSDB_FUNC_TAIL) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} else if (fid == TSDB_FUNC_INTERP) {
assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -3745,8 +3745,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION) {
functionId == TSDB_FUNC_TAIL) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} else if (functionId == TSDB_FUNC_INTERP) {
assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS);
......@@ -4012,8 +4011,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION ) {
functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......@@ -4084,8 +4082,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE ||
functionId == TSDB_FUNC_TAIL || functionId == TSDB_FUNC_STATE_COUNT ||
functionId == TSDB_FUNC_STATE_DURATION) {
functionId == TSDB_FUNC_TAIL) {
if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
}
......
......@@ -48,8 +48,6 @@ static SKeyword keywordTable[] = {
{"OR", TK_OR},
{"AND", TK_AND},
{"NOT", TK_NOT},
{"EQ", TK_EQ},
{"NE", TK_NE},
{"ISNULL", TK_ISNULL},
{"NOTNULL", TK_NOTNULL},
{"IS", TK_IS},
......@@ -58,10 +56,6 @@ static SKeyword keywordTable[] = {
{"GLOB", TK_GLOB},
{"BETWEEN", TK_BETWEEN},
{"IN", TK_IN},
{"GT", TK_GT},
{"GE", TK_GE},
{"LT", TK_LT},
{"LE", TK_LE},
{"BITAND", TK_BITAND},
{"BITOR", TK_BITOR},
{"LSHIFT", TK_LSHIFT},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册