提交 4830b7d9 编写于 作者: X Xiaoyu Wang

feat: support pseudo columns such as _qstart, _qend and _qduration

上级 d2ce16a8
......@@ -198,6 +198,7 @@ bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId);
bool fmIsClientPseudoColumnFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
......@@ -248,6 +248,7 @@ typedef struct SSelectStmt {
SNodeList* pOrderByList; // SOrderByExprNode
SLimitNode* pLimit;
SLimitNode* pSlimit;
STimeWindow timeRange;
char stmtName[TSDB_TABLE_NAME_LEN];
uint8_t precision;
int32_t selectFuncNum;
......
......@@ -47,6 +47,7 @@ extern "C" {
#define FUNC_MGT_FORBID_WINDOW_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(18)
#define FUNC_MGT_FORBID_GROUP_BY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
#define FUNC_MGT_SYSTEM_INFO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20)
#define FUNC_MGT_CLIENT_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......
......@@ -2786,31 +2786,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_qstart",
.type = FUNCTION_TYPE_QSTART,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_CLIENT_PC_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = qStartTsFunction, // todo
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_qend",
.type = FUNCTION_TYPE_QEND,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_CLIENT_PC_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = qEndTsFunction, // todo
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_qduration",
.type = FUNCTION_TYPE_QDURATION,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_CLIENT_PC_FUNC,
.translateFunc = translateWduration,
.getEnvFunc = getTimePseudoFuncEnv,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = winDurFunction, // todo
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
......
......@@ -183,6 +183,8 @@ bool fmIsSystemInfoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId,
bool fmIsImplicitTsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IMPLICIT_TS_FUNC); }
bool fmIsClientPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CLIENT_PC_FUNC); }
bool fmIsInterpFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......
......@@ -127,6 +127,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
COPY_SCALAR_FIELD(isDuration);
COPY_SCALAR_FIELD(translate);
COPY_SCALAR_FIELD(notReserved);
COPY_SCALAR_FIELD(isNull);
COPY_SCALAR_FIELD(placeholderNo);
COPY_SCALAR_FIELD(typeData);
COPY_SCALAR_FIELD(unit);
......
......@@ -2712,6 +2712,7 @@ static const char* jkValueLiteral = "Literal";
static const char* jkValueDuration = "Duration";
static const char* jkValueTranslate = "Translate";
static const char* jkValueNotReserved = "NotReserved";
static const char* jkValueIsNull = "IsNull";
static const char* jkValueDatum = "Datum";
static int32_t datumToJson(const void* pObj, SJson* pJson) {
......@@ -2798,6 +2799,9 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueNotReserved, pNode->notReserved);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueIsNull, pNode->isNull);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
code = datumToJson(pNode, pJson);
}
......@@ -2945,6 +2949,9 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueNotReserved, &pNode->notReserved);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueIsNull, &pNode->isNull);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
code = jsonToDatum(pJson, pNode);
}
......
......@@ -1692,6 +1692,11 @@ int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots) {
}
void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
if (pNode->isNull) {
pVal->nType = TSDB_DATA_TYPE_NULL;
pVal->nLen = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return;
}
pVal->nType = pNode->node.resType.type;
pVal->nLen = pNode->node.resType.bytes;
switch (pNode->node.resType.type) {
......
......@@ -740,6 +740,7 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select);
select->isTimeLineResult = true;
select->timeRange = TSWINDOW_INITIALIZER;
return (SNode*)select;
}
......
......@@ -1198,7 +1198,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
}
}
static int32_t rewriteSystemInfoFuncImpl(STranslateContext* pCxt, char* pLiteral, SNode** pNode) {
static int32_t rewriteFuncToValue(STranslateContext* pCxt, char* pLiteral, SNode** pNode) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pVal) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -1229,7 +1229,7 @@ static int32_t rewriteDatabaseFunc(STranslateContext* pCxt, SNode** pNode) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return rewriteSystemInfoFuncImpl(pCxt, pCurrDb, pNode);
return rewriteFuncToValue(pCxt, pCurrDb, pNode);
}
static int32_t rewriteClentVersionFunc(STranslateContext* pCxt, SNode** pNode) {
......@@ -1237,7 +1237,7 @@ static int32_t rewriteClentVersionFunc(STranslateContext* pCxt, SNode** pNode) {
if (NULL == pVer) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteSystemInfoFuncImpl(pCxt, pVer, pNode);
return rewriteFuncToValue(pCxt, pVer, pNode);
}
static int32_t rewriteServerVersionFunc(STranslateContext* pCxt, SNode** pNode) {
......@@ -1245,7 +1245,7 @@ static int32_t rewriteServerVersionFunc(STranslateContext* pCxt, SNode** pNode)
if (NULL == pVer) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteSystemInfoFuncImpl(pCxt, pVer, pNode);
return rewriteFuncToValue(pCxt, pVer, pNode);
}
static int32_t rewriteServerStatusFunc(STranslateContext* pCxt, SNode** pNode) {
......@@ -1253,7 +1253,7 @@ static int32_t rewriteServerStatusFunc(STranslateContext* pCxt, SNode** pNode) {
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
}
char* pStatus = taosMemoryStrDup((void*)"1");
return rewriteSystemInfoFuncImpl(pCxt, pStatus, pNode);
return rewriteFuncToValue(pCxt, pStatus, pNode);
}
static int32_t rewriteUserFunc(STranslateContext* pCxt, SNode** pNode) {
......@@ -1264,7 +1264,7 @@ static int32_t rewriteUserFunc(STranslateContext* pCxt, SNode** pNode) {
if (NULL == pUserConn) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteSystemInfoFuncImpl(pCxt, pUserConn, pNode);
return rewriteFuncToValue(pCxt, pUserConn, pNode);
}
static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) {
......@@ -1318,10 +1318,60 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p
return code;
}
static int32_t rewriteQueryTimeFunc(STranslateContext* pCxt, int64_t val, SNode** pNode) {
if (INT64_MIN == val || INT64_MAX == val) {
return rewriteFuncToValue(pCxt, NULL, pNode);
}
char* pStr = taosMemoryCalloc(1, 20);
if (NULL == pStr) {
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf(pStr, 20, "%" PRId64 "", val);
return rewriteFuncToValue(pCxt, pStr, pNode);
}
static int32_t rewriteQstartFunc(STranslateContext* pCxt, SNode** pNode) {
return rewriteQueryTimeFunc(pCxt, ((SSelectStmt*)pCxt->pCurrStmt)->timeRange.skey, pNode);
}
static int32_t rewriteQendFunc(STranslateContext* pCxt, SNode** pNode) {
return rewriteQueryTimeFunc(pCxt, ((SSelectStmt*)pCxt->pCurrStmt)->timeRange.ekey, pNode);
}
static int32_t rewriteQdurationFunc(STranslateContext* pCxt, SNode** pNode) {
STimeWindow range = ((SSelectStmt*)pCxt->pCurrStmt)->timeRange;
if (INT64_MIN == range.skey || INT64_MAX == range.ekey) {
return rewriteQueryTimeFunc(pCxt, INT64_MIN, pNode);
}
return rewriteQueryTimeFunc(pCxt, range.ekey - range.skey + 1, pNode);
}
static int32_t rewriteClientPseudoColumnFunc(STranslateContext* pCxt, SNode** pNode) {
if (NULL == pCxt->pCurrStmt || QUERY_NODE_SELECT_STMT != nodeType(pCxt->pCurrStmt) ||
pCxt->currClause <= SQL_CLAUSE_WHERE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, "Illegal pseudo column");
}
switch (((SFunctionNode*)*pNode)->funcType) {
case FUNCTION_TYPE_QSTART:
return rewriteQstartFunc(pCxt, pNode);
case FUNCTION_TYPE_QEND:
return rewriteQendFunc(pCxt, pNode);
case FUNCTION_TYPE_QDURATION:
return rewriteQdurationFunc(pCxt, pNode);
default:
break;
}
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pFunc) {
if (fmIsSystemInfoFunc((*pFunc)->funcId)) {
return rewriteSystemInfoFunc(pCxt, (SNode**)pFunc);
}
if (fmIsClientPseudoColumnFunc((*pFunc)->funcId)) {
return rewriteClientPseudoColumnFunc(pCxt, (SNode**)pFunc);
}
return translateNoramlFunction(pCxt, *pFunc);
}
......@@ -2078,7 +2128,7 @@ static int32_t getTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bo
return code;
}
static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWindow* pTimeRange) {
static int32_t getQueryTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWindow* pTimeRange) {
if (NULL == pWhere) {
*pTimeRange = TSWINDOW_INITIALIZER;
return TSDB_CODE_SUCCESS;
......@@ -2139,16 +2189,13 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
return TSDB_CODE_SUCCESS;
}
static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWindowNode* pInterval) {
static int32_t translateFill(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) {
if (NULL == pInterval->pFill) {
return TSDB_CODE_SUCCESS;
}
int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange));
if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval);
}
return code;
((SFillNode*)pInterval->pFill)->timeRange = pSelect->timeRange;
return checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval);
}
static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) {
......@@ -2235,7 +2282,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) {
int32_t code = checkIntervalWindow(pCxt, pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = translateFill(pCxt, pSelect->pWhere, pInterval);
code = translateFill(pCxt, pSelect, pInterval);
}
return code;
}
......@@ -2330,7 +2377,7 @@ static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect
code = translateExpr(pCxt, &pSelect->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange));
code = getQueryTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange));
}
if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery);
......@@ -2362,9 +2409,24 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartiti
return translateExprList(pCxt, pPartitionByList);
}
static int32_t translateWhere(STranslateContext* pCxt, SNode** pWhere) {
static bool isDataTable(int8_t tableType) {
return TSDB_SUPER_TABLE == tableType || TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType;
}
static bool needCalcTimeRange(SSelectStmt* pSelect) {
if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable)) {
return false;
}
return isDataTable(((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType);
}
static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->currClause = SQL_CLAUSE_WHERE;
return translateExpr(pCxt, pWhere);
int32_t code = translateExpr(pCxt, &pSelect->pWhere);
if (TSDB_CODE_SUCCESS == code && needCalcTimeRange(pSelect)) {
code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange);
}
return code;
}
static int32_t translateFrom(STranslateContext* pCxt, SNode* pTable) {
......@@ -2495,7 +2557,7 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
int32_t code = translateFrom(pCxt, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == code) {
pSelect->precision = ((STableNode*)pSelect->pFromTable)->precision;
code = translateWhere(pCxt, &pSelect->pWhere);
code = translateWhere(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = translatePartitionBy(pCxt, pSelect->pPartitionByList);
......@@ -2681,7 +2743,8 @@ static int32_t partitionDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet
}
static int32_t translateDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelete) {
int32_t code = translateWhere(pCxt, &pDelete->pWhere);
pCxt->currClause = SQL_CLAUSE_WHERE;
int32_t code = translateExpr(pCxt, &pDelete->pWhere);
if (TSDB_CODE_SUCCESS == code) {
code = partitionDeleteWhere(pCxt, pDelete);
}
......
......@@ -141,6 +141,11 @@ TEST_F(PlanBasicTest, pseudoColumn) {
useDb("root", "test");
run("SELECT _QSTART, _QEND, _QDURATION FROM t1");
run("SELECT _QSTART, _QEND, _QDURATION FROM t1 WHERE ts BETWEEN '2017-7-14 18:00:00' AND '2017-7-14 19:00:00'");
run("SELECT _QSTART, _QEND, _QDURATION, _WSTART, _WEND, _WDURATION, COUNT(*) FROM t1 "
"WHERE ts BETWEEN '2017-7-14 18:00:00' AND '2017-7-14 19:00:00' INTERVAL(10S)");
}
TEST_F(PlanBasicTest, withoutFrom) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册