From 6fae35afd024ca6f800a2955a512f8429141e66b Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 27 Mar 2020 17:55:54 +0800 Subject: [PATCH] [TD-32]add super table tag filter support at vnode side. --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscSQLParser.c | 11 +- src/client/src/tscServer.c | 49 +++-- src/client/src/tscUtil.c | 3 +- src/inc/taosmsg.h | 4 +- src/query/inc/qast.h | 4 +- src/query/src/qast.c | 218 ++++++++++---------- src/query/src/queryExecutor.c | 41 ++-- src/util/inc/tarray.h | 16 +- src/util/src/tarray.c | 32 +++ src/vnode/detail/src/vnodeUtil.c | 3 +- src/vnode/tsdb/inc/tsdb.h | 11 +- src/vnode/tsdb/src/tsdbRead.c | 329 ++++++++++++++++++++++++++++++- 13 files changed, 556 insertions(+), 167 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 402b6b3706..6bec41d146 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -177,7 +177,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex); -void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 6d5d08292d..804d4e5268 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3511,7 +3511,7 @@ int tableNameCompar(const void* lhs, const void* rhs) { return ret > 0 ? 1 : -1; } -static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr, +static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr, int16_t tableCondIndex, SStringBuilder* sb) { const char* msg = "table name too long"; @@ -3740,7 +3740,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, return ret; } - tsSetMetricQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c); + tsSetSTableQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c); doCompactQueryExpr(pExpr); tSQLExprDestroy(p1); @@ -3815,7 +3815,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql // 7. query condition for table name pQueryInfo->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; - ret = setTableCondForMetricQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb); + ret = setTableCondForSTableQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb); taosStringBuilderDestroy(&sb); if (!validateFilterExpr(pQueryInfo)) { @@ -5444,8 +5444,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); // two table: the first one is for current table, and the secondary is for the super table. - tscAddEmptyMetaInfo(pQueryInfo); - assert(pQueryInfo->numOfTables == 2); + if (pQueryInfo->numOfTables < 2) { + tscAddEmptyMetaInfo(pQueryInfo); + } const int32_t TABLE_INDEX = 0; const int32_t STABLE_INDEX = 1; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b2bd9110e7..22f8a85757 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -601,12 +601,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; -// SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; if (pQueryInfo->colList.numOfCols <= 0) { tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta)); return -1; } + + if (pQueryInfo->intervalTime < 0) { + tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); + return -1; + } + + if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) { + tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); + return -1; + } char *pStart = pCmd->payload + tsRpcHeadSize; @@ -643,8 +652,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { numOfTables = 1; } - pQueryMsg->numOfTables = htonl(numOfTables); - if (pQueryInfo->order.order == TSQL_SO_ASC) { pQueryMsg->window.skey = htobe64(pQueryInfo->stime); pQueryMsg->window.ekey = htobe64(pQueryInfo->etime); @@ -653,6 +660,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->window.ekey = htobe64(pQueryInfo->stime); } + pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->interpoType = htons(pQueryInfo->interpoType); @@ -662,23 +670,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; - - if (pQueryInfo->intervalTime < 0) { - tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); - return -1; - } - - if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) { - tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); - return -1; - } - pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter - pQueryMsg->tagLength = 0; - } else { // query on super table - pQueryMsg->tagLength = htons(0); - } pQueryMsg->queryType = htons(pQueryInfo->type); pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs); @@ -742,7 +734,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { bool hasArithmeticFunction = false; SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg; - for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); @@ -856,6 +847,24 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); } + if (pQueryInfo->tagCond.numOfTagCond > 0) { + STagCond* pTagCond = &pQueryInfo->tagCond; + + SCond *pCond = tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid); + if (pCond != NULL && pCond->cond != NULL) { + size_t condLen = strlen(pCond->cond) + 1; + + bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE); + if (!ret) { + tscError("%p mbs to ucs4 failed:%d", pSql, tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid)); + return 0; + } + + pQueryMsg->tagCondLen = htons(condLen); + pMsg += condLen * TSDB_NCHAR_SIZE; + } + } + msgLen = pMsg - pStart; tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 8df84744ce..ec6881db3f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -102,7 +102,8 @@ SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) { return NULL; } -void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) { +// todo refactor by using SArray +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) { size_t len = strlen(str); if (len == 0) { return; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 1345d80fe5..999e293d42 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -474,14 +474,14 @@ typedef struct { int64_t intervalOffset; // start offset for interval query int64_t slidingTime; // value for sliding window char slidingTimeUnit; // time interval type, for revisement of interval(1d) - int16_t tagLength; // tag length in current query + uint16_t tagCondLen; // tag length in current query int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx uint64_t groupbyTagIds; int64_t limit; int64_t offset; - int16_t queryType; // denote another query process + uint16_t queryType; // denote another query process int16_t numOfOutputCols; // final output columns numbers int16_t interpoType; // interpolate type uint64_t defaultVal; // default value array list diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 1e89ffb49d..d52eb24726 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include #include "os.h" #include "taosmsg.h" @@ -93,8 +94,7 @@ void tSQLBinaryExprToString(tSQLBinaryExpr *pExpr, char *dst, int32_t *len); void tSQLBinaryExprDestroy(tSQLBinaryExpr **pExprs, void (*fp)(void*)); -void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExprs, struct tSkipList *pSkipList, tQueryResultset *result, - SBinaryFilterSupp *param); +void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param); void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*cb)(void *, char *, int32_t)); diff --git a/src/query/src/qast.c b/src/query/src/qast.c index a0cbf12169..b301bbf043 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -14,6 +14,8 @@ */ #include "qast.h" +#include +#include #include "os.h" #include "qsqlparser.h" #include "qsyntaxtreefunction.h" @@ -705,126 +707,120 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu * @param pSchema tag schemas * @param fp filter callback function */ -static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) { - int32_t n = 0; - for (int32_t i = 0; i < pResult->num; ++i) { - void *pItem = pResult->pRes[i]; +static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, SArray *pResult, SBinaryFilterSupp *param) { + size_t size = taosArrayGetSize(pResult); + + SArray* array = taosArrayInit(size, POINTER_BYTES); + for (int32_t i = 0; i < size; ++i) { + void *pItem = taosArrayGetP(pResult, i); if (filterItem(pExpr, pItem, param)) { - pResult->pRes[n++] = pResult->pRes[i]; + taosArrayPush(array, &pItem); } } - - pResult->num = n; + + taosArrayCopy(pResult, array); } -//static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, tSkipList *pSkipList, -// SBinaryFilterSupp *param) { -// int32_t n = 0; -// SSkipListIterator iter = {0}; -// -// int32_t ret = tSkipListIteratorReset(pSkipList, &iter); -// assert(ret == 0); -// -// pResult->pRes = calloc(pSkipList->nSize, POINTER_BYTES); -// -// while (tSkipListIteratorNext(&iter)) { -// tSkipListNode *pNode = tSkipListIteratorGet(&iter); -// if (filterItem(pExpr, pNode, param)) { -// pResult->pRes[n++] = pNode; -// } -// } -// -// pResult->num = n; -//} +static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, SArray *pResult, SSkipList *pSkipList, + SBinaryFilterSupp *param) { + SSkipListIterator* iter = tSkipListCreateIter(pSkipList); + + while (tSkipListIterNext(iter)) { + SSkipListNode *pNode = tSkipListIterGet(iter); + + if (filterItem(pExpr, pNode, param)) { + taosArrayPush(pResult, SL_GET_NODE_DATA(pNode)); + } + } +} // post-root order traverse syntax tree -//void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, tSkipList *pSkipList, tQueryResultset *result, -// SBinaryFilterSupp *param) { -// if (pExpr == NULL) { -// return; -// } -// -// tSQLSyntaxNode *pLeft = pExpr->pLeft; -// tSQLSyntaxNode *pRight = pExpr->pRight; -// -// // recursive traverse left child branch -// if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) { -// uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey; -// -// if (weight == 0 && result->num > 0 && pSkipList == NULL) { -// /** -// * Perform the filter operation based on the initial filter result, which is obtained from filtering from index. -// * Since no index presented, the filter operation is done by scan all elements in the result set. -// * -// * if the query is a high selectivity filter, only small portion of meters are retrieved. -// */ -// tSQLBinaryTraverseOnResult(pExpr, result, param); -// } else if (weight == 0) { -// /** -// * apply the hierarchical expression to every node in skiplist for find the qualified nodes -// */ -// assert(result->num == 0); -// tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); -// } else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) { -// tQueryResultset rLeft = {0}; -// tQueryResultset rRight = {0}; -// -// tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param); -// tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param); -// -// if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS -// intersect(&rLeft, &rRight, result); -// } else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or -// merge(&rLeft, &rRight, result); -// } else { -// assert(false); -// } -// -// free(rLeft.pRes); -// free(rRight.pRes); -// } else { -// /* -// * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here -// * -// * first, we filter results based on the skiplist index, which is the initial filter stage, -// * then, we conduct the secondary filter operation based on the result from the initial filter stage. -// */ -// assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND); -// -// tSQLBinaryExpr *pFirst = NULL; -// tSQLBinaryExpr *pSecond = NULL; -// if (pLeft->pExpr->filterOnPrimaryKey == 1) { -// pFirst = pLeft->pExpr; -// pSecond = pRight->pExpr; -// } else { -// pFirst = pRight->pExpr; -// pSecond = pLeft->pExpr; -// } -// -// assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL); -// -// // we filter the result based on the skiplist index in the first place -// tSQLBinaryExprTraverse(pFirst, pSkipList, result, param); -// -// /* -// * recursively perform the filter operation based on the initial results, -// * So, we do not set the skiplist index as a parameter -// */ -// tSQLBinaryExprTraverse(pSecond, NULL, result, param); -// } -// } else { // column project -// assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE); -// -// param->setupInfoFn(pExpr, param->pExtInfo); -// if (pSkipList == NULL) { -// tSQLListTraverseOnResult(pExpr, param->fp, result); -// } else { +void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) { + if (pExpr == NULL) { + return; + } + + tSQLSyntaxNode *pLeft = pExpr->pLeft; + tSQLSyntaxNode *pRight = pExpr->pRight; + + // recursive traverse left child branch + if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) { + uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey; + + if (weight == 0 && taosArrayGetSize(result) > 0 && pSkipList == NULL) { + /** + * Perform the filter operation based on the initial filter result, which is obtained from filtering from index. + * Since no index presented, the filter operation is done by scan all elements in the result set. + * + * if the query is a high selectivity filter, only small portion of meters are retrieved. + */ + tSQLBinaryTraverseOnResult(pExpr, result, param); + } else if (weight == 0) { + /** + * apply the hierarchical expression to every node in skiplist for find the qualified nodes + */ + assert(taosArrayGetSize(result) == 0); + tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); + } else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) { + tQueryResultset rLeft = {0}; + tQueryResultset rRight = {0}; + + tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param); + tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param); + + if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS + intersect(&rLeft, &rRight, result); + } else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or + merge(&rLeft, &rRight, result); + } else { + assert(false); + } + + free(rLeft.pRes); + free(rRight.pRes); + } else { + /* + * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here + * + * first, we filter results based on the skiplist index, which is the initial filter stage, + * then, we conduct the secondary filter operation based on the result from the initial filter stage. + */ + assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND); + + tSQLBinaryExpr *pFirst = NULL; + tSQLBinaryExpr *pSecond = NULL; + if (pLeft->pExpr->filterOnPrimaryKey == 1) { + pFirst = pLeft->pExpr; + pSecond = pRight->pExpr; + } else { + pFirst = pRight->pExpr; + pSecond = pLeft->pExpr; + } + + assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL); + + // we filter the result based on the skiplist index in the first place + tSQLBinaryExprTraverse(pFirst, pSkipList, result, param); + + /* + * recursively perform the filter operation based on the initial results, + * So, we do not set the skiplist index as a parameter + */ + tSQLBinaryExprTraverse(pSecond, NULL, result, param); + } + } else { // column project + assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE); + + param->setupInfoFn(pExpr, param->pExtInfo); + if (pSkipList == NULL) { + tSQLListTraverseOnResult(pExpr, param->fp, result); + } else { // assert(result->num == 0); -//// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result); -// } -// } -//} +// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result); + } + } +} void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*getSourceDataBlock)(void *, char *, int32_t)) { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a1167ff440..86baf44f45 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -5310,11 +5310,6 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { return -1; } - if (pQueryTableMsg->tagLength < 0) { - dError("qmsg:%p illegal value of tag length %d", pQueryTableMsg, pQueryTableMsg->tagLength); - return -1; - } - return 0; } @@ -5353,7 +5348,8 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra * @param pExpr * @return */ -static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr) { +static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, + wchar_t** tagCond) { pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); @@ -5372,7 +5368,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols); pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols); pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols); - pQueryTableMsg->tagLength = htons(pQueryTableMsg->tagLength); + pQueryTableMsg->tagCondLen = htons(pQueryTableMsg->tagCondLen); pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset); pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen); @@ -5428,7 +5424,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId bool hasArithmeticFunction = false; *pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES); - SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg; for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) { @@ -5501,6 +5496,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) { v[i] = htobe64(v[i]); } + + pMsg += sizeof(int64_t) * pQueryTableMsg->numOfOutputCols; + } + + // the tag query condition expression string is located at the end of query msg + if (pQueryTableMsg->tagCondLen > 0) { + *tagCond = calloc(1, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE); + memcpy(*tagCond, pMsg, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE); } dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " @@ -5993,9 +5996,9 @@ void vnodeFreeQInfo(SQInfo *pQInfo) { static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) { int32_t code = TSDB_CODE_SUCCESS; - + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); - if (pQInfo == NULL) { + if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error; } @@ -6024,6 +6027,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE tsBufNextPos(pTSBuf); } + // filter the qualified if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -6050,7 +6054,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ SArray *pTableIdList = NULL; SSqlFuncExprMsg** pExprMsg = NULL; - if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg)) != TSDB_CODE_SUCCESS) { + wchar_t* tagCond = NULL; + + if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) { return code; } @@ -6076,7 +6082,18 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ if ((pGroupbyExpr == NULL && pQueryTableMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { goto _query_over; } - + + // super table query + if ((pQueryTableMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { + STableId* id = taosArrayGet(pTableIdList, 0); + + SArray* res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryTableMsg->tagCondLen); + if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode + code = TSDB_CODE_SUCCESS; + goto _query_over; + } + } + code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); _query_over: diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index 55bdc849ad..6b70780d10 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -87,7 +87,21 @@ size_t taosArrayGetSize(const SArray* pArray); void* taosArrayInsert(SArray* pArray, size_t index, void* pData); /** - * + * remove data entry of the given index + * @param pArray + * @param index + */ +void taosArrayRemove(SArray* pArray, size_t index); + +/** + * copy the whole array from source to destination + * @param pDst + * @param pSrc + */ +void taosArrayCopy(SArray* pDst, SArray* pSrc); + +/** + * destroy array list * @param pArray */ void taosArrayDestroy(SArray* pArray); diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index fb2dac827e..eb1d213317 100755 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -128,6 +128,38 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData) { return dst; } +void taosArrayRemove(SArray* pArray, size_t index) { + assert(index < pArray->size); + + if (index == pArray->size - 1) { + taosArrayPop(pArray); + return; + } + + size_t remain = pArray->size - index - 1; + memmove(pArray->pData + index * pArray->elemSize, pArray->pData + (index + 1) * pArray->elemSize, remain * pArray->elemSize); + pArray->size -= 1; +} + +void taosArrayCopy(SArray* pDst, SArray* pSrc) { + assert(pSrc != NULL && pDst != NULL); + + if (pDst->capacity < pSrc->size) { + void* pData = realloc(pDst->pData, pSrc->size * pSrc->elemSize); + if (pData == NULL) { // todo handle oom + + } else { + pDst->pData = pData; + pDst->capacity = pSrc->size; + } + } + + memcpy(pDst->pData, pSrc->pData, pSrc->elemSize * pSrc->size); + pDst->elemSize = pSrc->elemSize; + pDst->capacity = pSrc->size; + pDst->size = pSrc->size; +} + void taosArrayDestroy(SArray* pArray) { if (pArray == NULL) { return; diff --git a/src/vnode/detail/src/vnodeUtil.c b/src/vnode/detail/src/vnodeUtil.c index 43c24bae6a..1d5f45e5e3 100644 --- a/src/vnode/detail/src/vnodeUtil.c +++ b/src/vnode/detail/src/vnodeUtil.c @@ -164,8 +164,7 @@ static int32_t vnodeBuildExprFromArithmeticStr(SSqlFunctionExpr* pExpr, SQueryMe SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols); dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz); - tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz, - pExpr->pBase.arg[0].argBytes); + tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz); if (pBinExpr == NULL) { dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz); diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 92d8ad757b..ed6bb044c6 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -207,11 +207,6 @@ typedef struct SDataBlockInfo { int32_t sid; } SDataBlockInfo; -typedef struct STableIDList { - STableId *tableIds; - int32_t num; -} STableIDList; - typedef struct { } SFields; @@ -325,15 +320,15 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable * @param pQueryHandle * @return table sid list. the invoker is responsible for the release of this the sid list. */ -STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); +SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); /** - * Get the qualified table sid for a super table according to the tag query expression. + * Get the qualified table id for a super table according to the tag query expression. * @param stableid. super table sid * @param pTagCond. tag query condition * */ -STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond); +SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index fc196bb62f..0140e2e11e 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -13,9 +13,12 @@ * along with this program. If not, see . */ +#include #include "os.h" #include "tutil.h" +#include "../../../query/inc/qast.h" +#include "../../../query/inc/tsqlfunction.h" #include "tsdb.h" #include "tsdbFile.h" #include "tsdbMeta.h" @@ -413,6 +416,328 @@ SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {} -STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {} +SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {} -STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {} +static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) { + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); + assert(pTable != NULL); //assert pTable is a super table + + size_t size = tSkipListGetSize(pTable->pIndex); + SArray* pList = taosArrayInit(size, sizeof(STableId)); + + SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex); + while(tSkipListIterNext(iter)) { + STable* t = *(STable**) tSkipListIterGet(iter); + taosArrayPush(pList, &t->tableId); + } + + return pList; +} + +typedef struct SSyntaxTreeFilterSupporter { + SSchema* pTagSchema; + int32_t numOfTags; + int32_t optr; +} SSyntaxTreeFilterSupporter; + +/** + * convert the result pointer to STabObj instead of tSkipListNode + * @param pRes + */ +static void tansformQueryResult(SArray* pRes) { + if (pRes == NULL || taosArrayGetSize(pRes) == 0) { + return; + } + + size_t size = taosArrayGetSize(pRes); + for (int32_t i = 0; i < size; ++i) { +// pRes->pRes[i] = ((tSkipListNode*)(pRes->pRes[i]))->pData; + } +} + +void tSQLListTraverseDestroyInfo(void* param) { + if (param == NULL) { + return; + } + + tQueryInfo* pInfo = (tQueryInfo*)param; + tVariantDestroy(&(pInfo->q)); + free(param); +} + +static char* convertTagQueryStr(const wchar_t* str, size_t len) { + char* mbs = NULL; + + if (len > 0) { + mbs = calloc(1, (len + 1) * TSDB_NCHAR_SIZE); + taosUcs4ToMbs((void*) str, len * TSDB_NCHAR_SIZE, mbs); //todo add log + } + + return mbs; +} + +static int32_t compareStrVal(const void* pLeft, const void* pRight) { + int32_t ret = strcmp(pLeft, pRight); + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +static int32_t compareWStrVal(const void* pLeft, const void* pRight) { + int32_t ret = wcscmp(pLeft, pRight); + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +static int32_t compareIntVal(const void* pLeft, const void* pRight) { + DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_INT64_VAL(pRight)); +} + +static int32_t compareIntDoubleVal(const void* pLeft, const void* pRight) { + DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_DOUBLE_VAL(pRight)); +} + +static int32_t compareDoubleVal(const void* pLeft, const void* pRight) { + DEFAULT_COMP(GET_DOUBLE_VAL(pLeft), GET_DOUBLE_VAL(pRight)); +} + +static int32_t compareDoubleIntVal(const void* pLeft, const void* pRight) { + double ret = (*(double*)pLeft) - (*(int64_t*)pRight); + if (fabs(ret) < DBL_EPSILON) { + return 0; + } else { + return ret > 0 ? 1 : -1; + } +} + +static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { + SPatternCompareInfo pInfo = {'%', '_'}; + + const char* pattern = pRight; + const char* str = pLeft; + + int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo); + + return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; +} + +static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { + SPatternCompareInfo pInfo = {'%', '_'}; + + const wchar_t* pattern = pRight; + const wchar_t* str = pLeft; + + int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo); + + return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; +} + +static __compar_fn_t getFilterComparator(int32_t type, int32_t filterType, int32_t optr) { + __compar_fn_t comparator = NULL; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BOOL: { + if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) { + comparator = compareIntVal; + } else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = compareIntDoubleVal; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) { + comparator = compareDoubleIntVal; + } else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) { + comparator = compareDoubleVal; + } + break; + } + + case TSDB_DATA_TYPE_BINARY: { + assert(filterType == TSDB_DATA_TYPE_BINARY); + + if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */ + comparator = compareStrPatternComp; + } else { /* normal relational comparator */ + comparator = compareStrVal; + } + + break; + } + + case TSDB_DATA_TYPE_NCHAR: { + assert(filterType == TSDB_DATA_TYPE_NCHAR); + + if (optr == TSDB_RELATION_LIKE) { + comparator = compareWStrPatternComp; + } else { + comparator = compareWStrVal; + } + + break; + } + default: + comparator = compareIntVal; + break; + } + + return comparator; +} + +static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pSchema, int32_t* index, + int32_t* offset) { + *index = 0; + *offset = 0; + + // filter on table name(TBNAME) + if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) { + *index = TSDB_TBNAME_COLUMN_INDEX; + *offset = TSDB_TBNAME_COLUMN_INDEX; + return; + } + + while ((*index) < pSupporter->numOfTags) { + if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes && + pSupporter->pTagSchema[*index].type == pSchema->type && + strcmp(pSupporter->pTagSchema[*index].name, pSchema->name) == 0) { + break; + } else { + (*offset) += pSupporter->pTagSchema[(*index)++].bytes; + } + } +} + +void filterPrepare(void* expr, void* param) { + tSQLBinaryExpr *pExpr = (tSQLBinaryExpr*) expr; + if (pExpr->info != NULL) { + return; + } + + int32_t i = 0, offset = 0; + pExpr->info = calloc(1, sizeof(tQueryInfo)); + + tQueryInfo* pInfo = pExpr->info; + SSyntaxTreeFilterSupporter* pSupporter = (SSyntaxTreeFilterSupporter*)param; + + tVariant* pCond = pExpr->pRight->pVal; + SSchema* pSchema = pExpr->pLeft->pSchema; + + getTagColumnInfo(pSupporter, pSchema, &i, &offset); + assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX)); + assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX)); + + pInfo->sch = *pSchema; + pInfo->colIdx = i; + pInfo->optr = pExpr->nSQLBinaryOptr; + pInfo->offset = offset; + pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr); + + tVariantAssign(&pInfo->q, pCond); + tVariantTypeSetType(&pInfo->q, pInfo->sch.type); +} + +bool tSkipListNodeFilterCallback(const void* pNode, void* param) { + tQueryInfo* pInfo = (tQueryInfo*)param; + + STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); + + char buf[TSDB_MAX_TAGS_LEN] = {0}; + + char* val = dataRowTuple(pTable->tagVal); // todo not only the first column + int8_t type = pInfo->sch.type; + + int32_t ret = 0; + if (pInfo->q.nType == TSDB_DATA_TYPE_BINARY || pInfo->q.nType == TSDB_DATA_TYPE_NCHAR) { + ret = pInfo->compare(val, pInfo->q.pz); + } else { + tVariant t = {0}; + tVariantCreateFromBinary(&t, val, (uint32_t) pInfo->sch.bytes, type); + + ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key); + } + + switch (pInfo->optr) { + case TSDB_RELATION_EQUAL: { + return ret == 0; + } + case TSDB_RELATION_NOT_EQUAL: { + return ret != 0; + } + case TSDB_RELATION_LARGE_EQUAL: { + return ret >= 0; + } + case TSDB_RELATION_LARGE: { + return ret > 0; + } + case TSDB_RELATION_LESS_EQUAL: { + return ret <= 0; + } + case TSDB_RELATION_LESS: { + return ret < 0; + } + case TSDB_RELATION_LIKE: { + return ret == 0; + } + + default: + assert(false); + } + return true; +} + +static int32_t mgmtFilterMeterByIndex(STable* pSTable, SArray* pRes, const char* pCond) { + STColumn* stcol = schemaColAt(pSTable->tagSchema, 0); + + tSQLBinaryExpr* pExpr = NULL; + tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond)); + + // failed to build expression, no result, return immediately + if (pExpr == NULL) { + mError("table:%" PRIu64 ", no result returned, error in super table query expression:%s", pSTable->tableId.uid, pCond); + tfree(pCond); + + return TSDB_CODE_OPS_NOT_SUPPORT; + } + + // query according to the binary expression + SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)}; + + SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, + .setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare, + .pExtInfo = &s}; + + tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp); + tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo); + + tansformQueryResult(pRes); + + return TSDB_CODE_SUCCESS; +} + +SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) { + // no condition, all tables created according to the stable will involved in querying + if (pTagCond == NULL || wcslen(pTagCond) == 0) { + return createTableIdArrayList(tsdb, uid); + } else { + char* str = convertTagQueryStr(pTagCond, len); + SArray* result = taosArrayInit(8, POINTER_BYTES); + + STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); + assert(pSTable != NULL); + + if (mgmtFilterMeterByIndex(pSTable, result, str) == TSDB_CODE_SUCCESS) { + return result; + } + } +} -- GitLab