提交 6fae35af 编写于 作者: H hjxilinx

[TD-32]add super table tag filter support at vnode side.

上级 f77c3a69
......@@ -177,7 +177,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex);
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond);
......
......@@ -3511,7 +3511,7 @@ int tableNameCompar(const void* lhs, const void* rhs) {
return ret > 0 ? 1 : -1;
}
static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* account, tSQLExpr* pExpr,
int16_t tableCondIndex, SStringBuilder* sb) {
const char* msg = "table name too long";
......@@ -3740,7 +3740,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
return ret;
}
tsSetMetricQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
tsSetSTableQueryCond(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
......@@ -3815,7 +3815,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
// 7. query condition for table name
pQueryInfo->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
ret = setTableCondForSTableQuery(pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pQueryInfo)) {
......@@ -5444,8 +5444,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// two table: the first one is for current table, and the secondary is for the super table.
tscAddEmptyMetaInfo(pQueryInfo);
assert(pQueryInfo->numOfTables == 2);
if (pQueryInfo->numOfTables < 2) {
tscAddEmptyMetaInfo(pQueryInfo);
}
const int32_t TABLE_INDEX = 0;
const int32_t STABLE_INDEX = 1;
......
......@@ -601,12 +601,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
// SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
}
if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
return -1;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
return -1;
}
char *pStart = pCmd->payload + tsRpcHeadSize;
......@@ -643,8 +652,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
numOfTables = 1;
}
pQueryMsg->numOfTables = htonl(numOfTables);
if (pQueryInfo->order.order == TSQL_SO_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
......@@ -653,6 +660,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
}
pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
......@@ -662,23 +670,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
return -1;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
return -1;
}
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter
pQueryMsg->tagLength = 0;
} else { // query on super table
pQueryMsg->tagLength = htons(0);
}
pQueryMsg->queryType = htons(pQueryInfo->type);
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
......@@ -742,7 +734,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
bool hasArithmeticFunction = false;
SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -856,6 +847,24 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
}
if (pQueryInfo->tagCond.numOfTagCond > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid);
if (pCond != NULL && pCond->cond != NULL) {
size_t condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) {
tscError("%p mbs to ucs4 failed:%d", pSql, tsGetSTableQueryCondPos(pTagCond, pTableMeta->uid));
return 0;
}
pQueryMsg->tagCondLen = htons(condLen);
pMsg += condLen * TSDB_NCHAR_SIZE;
}
}
msgLen = pMsg - pStart;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
......
......@@ -102,7 +102,8 @@ SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) {
return NULL;
}
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
// todo refactor by using SArray
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, const char* str) {
size_t len = strlen(str);
if (len == 0) {
return;
......
......@@ -474,14 +474,14 @@ typedef struct {
int64_t intervalOffset; // start offset for interval query
int64_t slidingTime; // value for sliding window
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int16_t tagLength; // tag length in current query
uint16_t tagCondLen; // tag length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds;
int64_t limit;
int64_t offset;
int16_t queryType; // denote another query process
uint16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include <tskiplist.h>
#include "os.h"
#include "taosmsg.h"
......@@ -93,8 +94,7 @@ void tSQLBinaryExprToString(tSQLBinaryExpr *pExpr, char *dst, int32_t *len);
void tSQLBinaryExprDestroy(tSQLBinaryExpr **pExprs, void (*fp)(void*));
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExprs, struct tSkipList *pSkipList, tQueryResultset *result,
SBinaryFilterSupp *param);
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, char *, int32_t));
......
......@@ -14,6 +14,8 @@
*/
#include "qast.h"
#include <tarray.h>
#include <tskiplist.h>
#include "os.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
......@@ -705,126 +707,120 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
* @param pSchema tag schemas
* @param fp filter callback function
*/
static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) {
int32_t n = 0;
for (int32_t i = 0; i < pResult->num; ++i) {
void *pItem = pResult->pRes[i];
static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, SArray *pResult, SBinaryFilterSupp *param) {
size_t size = taosArrayGetSize(pResult);
SArray* array = taosArrayInit(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) {
void *pItem = taosArrayGetP(pResult, i);
if (filterItem(pExpr, pItem, param)) {
pResult->pRes[n++] = pResult->pRes[i];
taosArrayPush(array, &pItem);
}
}
pResult->num = n;
taosArrayCopy(pResult, array);
}
//static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, tSkipList *pSkipList,
// SBinaryFilterSupp *param) {
// int32_t n = 0;
// SSkipListIterator iter = {0};
//
// int32_t ret = tSkipListIteratorReset(pSkipList, &iter);
// assert(ret == 0);
//
// pResult->pRes = calloc(pSkipList->nSize, POINTER_BYTES);
//
// while (tSkipListIteratorNext(&iter)) {
// tSkipListNode *pNode = tSkipListIteratorGet(&iter);
// if (filterItem(pExpr, pNode, param)) {
// pResult->pRes[n++] = pNode;
// }
// }
//
// pResult->num = n;
//}
static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, SArray *pResult, SSkipList *pSkipList,
SBinaryFilterSupp *param) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, SL_GET_NODE_DATA(pNode));
}
}
}
// post-root order traverse syntax tree
//void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, tSkipList *pSkipList, tQueryResultset *result,
// SBinaryFilterSupp *param) {
// if (pExpr == NULL) {
// return;
// }
//
// tSQLSyntaxNode *pLeft = pExpr->pLeft;
// tSQLSyntaxNode *pRight = pExpr->pRight;
//
// // recursive traverse left child branch
// if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) {
// uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey;
//
// if (weight == 0 && result->num > 0 && pSkipList == NULL) {
// /**
// * Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
// * Since no index presented, the filter operation is done by scan all elements in the result set.
// *
// * if the query is a high selectivity filter, only small portion of meters are retrieved.
// */
// tSQLBinaryTraverseOnResult(pExpr, result, param);
// } else if (weight == 0) {
// /**
// * apply the hierarchical expression to every node in skiplist for find the qualified nodes
// */
// assert(result->num == 0);
// tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
// } else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) {
// tQueryResultset rLeft = {0};
// tQueryResultset rRight = {0};
//
// tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param);
// tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param);
//
// if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS
// intersect(&rLeft, &rRight, result);
// } else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or
// merge(&rLeft, &rRight, result);
// } else {
// assert(false);
// }
//
// free(rLeft.pRes);
// free(rRight.pRes);
// } else {
// /*
// * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
// *
// * first, we filter results based on the skiplist index, which is the initial filter stage,
// * then, we conduct the secondary filter operation based on the result from the initial filter stage.
// */
// assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND);
//
// tSQLBinaryExpr *pFirst = NULL;
// tSQLBinaryExpr *pSecond = NULL;
// if (pLeft->pExpr->filterOnPrimaryKey == 1) {
// pFirst = pLeft->pExpr;
// pSecond = pRight->pExpr;
// } else {
// pFirst = pRight->pExpr;
// pSecond = pLeft->pExpr;
// }
//
// assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
//
// // we filter the result based on the skiplist index in the first place
// tSQLBinaryExprTraverse(pFirst, pSkipList, result, param);
//
// /*
// * recursively perform the filter operation based on the initial results,
// * So, we do not set the skiplist index as a parameter
// */
// tSQLBinaryExprTraverse(pSecond, NULL, result, param);
// }
// } else { // column project
// assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
//
// param->setupInfoFn(pExpr, param->pExtInfo);
// if (pSkipList == NULL) {
// tSQLListTraverseOnResult(pExpr, param->fp, result);
// } else {
void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) {
if (pExpr == NULL) {
return;
}
tSQLSyntaxNode *pLeft = pExpr->pLeft;
tSQLSyntaxNode *pRight = pExpr->pRight;
// recursive traverse left child branch
if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) {
uint8_t weight = pLeft->pExpr->filterOnPrimaryKey + pRight->pExpr->filterOnPrimaryKey;
if (weight == 0 && taosArrayGetSize(result) > 0 && pSkipList == NULL) {
/**
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
* Since no index presented, the filter operation is done by scan all elements in the result set.
*
* if the query is a high selectivity filter, only small portion of meters are retrieved.
*/
tSQLBinaryTraverseOnResult(pExpr, result, param);
} else if (weight == 0) {
/**
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
*/
assert(taosArrayGetSize(result) == 0);
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
} else if (weight == 2 || (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_OR)) {
tQueryResultset rLeft = {0};
tQueryResultset rRight = {0};
tSQLBinaryExprTraverse(pLeft->pExpr, pSkipList, &rLeft, param);
tSQLBinaryExprTraverse(pRight->pExpr, pSkipList, &rRight, param);
if (pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) { // CROSS
intersect(&rLeft, &rRight, result);
} else if (pExpr->nSQLBinaryOptr == TSDB_RELATION_OR) { // or
merge(&rLeft, &rRight, result);
} else {
assert(false);
}
free(rLeft.pRes);
free(rRight.pRes);
} else {
/*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
*
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert(pExpr->nSQLBinaryOptr == TSDB_RELATION_AND);
tSQLBinaryExpr *pFirst = NULL;
tSQLBinaryExpr *pSecond = NULL;
if (pLeft->pExpr->filterOnPrimaryKey == 1) {
pFirst = pLeft->pExpr;
pSecond = pRight->pExpr;
} else {
pFirst = pRight->pExpr;
pSecond = pLeft->pExpr;
}
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
// we filter the result based on the skiplist index in the first place
tSQLBinaryExprTraverse(pFirst, pSkipList, result, param);
/*
* recursively perform the filter operation based on the initial results,
* So, we do not set the skiplist index as a parameter
*/
tSQLBinaryExprTraverse(pSecond, NULL, result, param);
}
} else { // column project
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) {
tSQLListTraverseOnResult(pExpr, param->fp, result);
} else {
// assert(result->num == 0);
//// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result);
// }
// }
//}
// tSQLDoFilterInitialResult(pSkipList, param->fp, pExpr->info, result);
}
}
}
void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, char *, int32_t)) {
......
......@@ -5310,11 +5310,6 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
return -1;
}
if (pQueryTableMsg->tagLength < 0) {
dError("qmsg:%p illegal value of tag length %d", pQueryTableMsg, pQueryTableMsg->tagLength);
return -1;
}
return 0;
}
......@@ -5353,7 +5348,8 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra
* @param pExpr
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr) {
static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
wchar_t** tagCond) {
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
......@@ -5372,7 +5368,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols);
pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols);
pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols);
pQueryTableMsg->tagLength = htons(pQueryTableMsg->tagLength);
pQueryTableMsg->tagCondLen = htons(pQueryTableMsg->tagCondLen);
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
......@@ -5428,7 +5424,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
bool hasArithmeticFunction = false;
*pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES);
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
......@@ -5501,6 +5496,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
v[i] = htobe64(v[i]);
}
pMsg += sizeof(int64_t) * pQueryTableMsg->numOfOutputCols;
}
// the tag query condition expression string is located at the end of query msg
if (pQueryTableMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
}
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
......@@ -5993,9 +5996,9 @@ void vnodeFreeQInfo(SQInfo *pQInfo) {
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if (pQInfo == NULL) {
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
......@@ -6024,6 +6027,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
tsBufNextPos(pTSBuf);
}
// filter the qualified
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -6050,7 +6054,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
SArray *pTableIdList = NULL;
SSqlFuncExprMsg** pExprMsg = NULL;
if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg)) != TSDB_CODE_SUCCESS) {
wchar_t* tagCond = NULL;
if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6076,7 +6082,18 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ
if ((pGroupbyExpr == NULL && pQueryTableMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
}
// super table query
if ((pQueryTableMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
STableId* id = taosArrayGet(pTableIdList, 0);
SArray* res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryTableMsg->tagCondLen);
if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode
code = TSDB_CODE_SUCCESS;
goto _query_over;
}
}
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
_query_over:
......
......@@ -87,7 +87,21 @@ size_t taosArrayGetSize(const SArray* pArray);
void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
/**
*
* remove data entry of the given index
* @param pArray
* @param index
*/
void taosArrayRemove(SArray* pArray, size_t index);
/**
* copy the whole array from source to destination
* @param pDst
* @param pSrc
*/
void taosArrayCopy(SArray* pDst, SArray* pSrc);
/**
* destroy array list
* @param pArray
*/
void taosArrayDestroy(SArray* pArray);
......
......@@ -128,6 +128,38 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
return dst;
}
void taosArrayRemove(SArray* pArray, size_t index) {
assert(index < pArray->size);
if (index == pArray->size - 1) {
taosArrayPop(pArray);
return;
}
size_t remain = pArray->size - index - 1;
memmove(pArray->pData + index * pArray->elemSize, pArray->pData + (index + 1) * pArray->elemSize, remain * pArray->elemSize);
pArray->size -= 1;
}
void taosArrayCopy(SArray* pDst, SArray* pSrc) {
assert(pSrc != NULL && pDst != NULL);
if (pDst->capacity < pSrc->size) {
void* pData = realloc(pDst->pData, pSrc->size * pSrc->elemSize);
if (pData == NULL) { // todo handle oom
} else {
pDst->pData = pData;
pDst->capacity = pSrc->size;
}
}
memcpy(pDst->pData, pSrc->pData, pSrc->elemSize * pSrc->size);
pDst->elemSize = pSrc->elemSize;
pDst->capacity = pSrc->size;
pDst->size = pSrc->size;
}
void taosArrayDestroy(SArray* pArray) {
if (pArray == NULL) {
return;
......
......@@ -164,8 +164,7 @@ static int32_t vnodeBuildExprFromArithmeticStr(SSqlFunctionExpr* pExpr, SQueryMe
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
pExpr->pBase.arg[0].argBytes);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz);
if (pBinExpr == NULL) {
dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
......
......@@ -207,11 +207,6 @@ typedef struct SDataBlockInfo {
int32_t sid;
} SDataBlockInfo;
typedef struct STableIDList {
STableId *tableIds;
int32_t num;
} STableIDList;
typedef struct {
} SFields;
......@@ -325,15 +320,15 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable
* @param pQueryHandle
* @return table sid list. the invoker is responsible for the release of this the sid list.
*/
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
/**
* Get the qualified table sid for a super table according to the tag query expression.
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
* @param pTagCond. tag query condition
*
*/
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond);
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
#ifdef __cplusplus
}
......
......@@ -13,9 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tlog.h>
#include "os.h"
#include "tutil.h"
#include "../../../query/inc/qast.h"
#include "../../../query/inc/tsqlfunction.h"
#include "tsdb.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
......@@ -413,6 +416,328 @@ SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList,
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {}
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {}
static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pTable != NULL); //assert pTable is a super table
size_t size = tSkipListGetSize(pTable->pIndex);
SArray* pList = taosArrayInit(size, sizeof(STableId));
SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
while(tSkipListIterNext(iter)) {
STable* t = *(STable**) tSkipListIterGet(iter);
taosArrayPush(pList, &t->tableId);
}
return pList;
}
typedef struct SSyntaxTreeFilterSupporter {
SSchema* pTagSchema;
int32_t numOfTags;
int32_t optr;
} SSyntaxTreeFilterSupporter;
/**
* convert the result pointer to STabObj instead of tSkipListNode
* @param pRes
*/
static void tansformQueryResult(SArray* pRes) {
if (pRes == NULL || taosArrayGetSize(pRes) == 0) {
return;
}
size_t size = taosArrayGetSize(pRes);
for (int32_t i = 0; i < size; ++i) {
// pRes->pRes[i] = ((tSkipListNode*)(pRes->pRes[i]))->pData;
}
}
void tSQLListTraverseDestroyInfo(void* param) {
if (param == NULL) {
return;
}
tQueryInfo* pInfo = (tQueryInfo*)param;
tVariantDestroy(&(pInfo->q));
free(param);
}
static char* convertTagQueryStr(const wchar_t* str, size_t len) {
char* mbs = NULL;
if (len > 0) {
mbs = calloc(1, (len + 1) * TSDB_NCHAR_SIZE);
taosUcs4ToMbs((void*) str, len * TSDB_NCHAR_SIZE, mbs); //todo add log
}
return mbs;
}
static int32_t compareStrVal(const void* pLeft, const void* pRight) {
int32_t ret = strcmp(pLeft, pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static int32_t compareWStrVal(const void* pLeft, const void* pRight) {
int32_t ret = wcscmp(pLeft, pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static int32_t compareIntVal(const void* pLeft, const void* pRight) {
DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_INT64_VAL(pRight));
}
static int32_t compareIntDoubleVal(const void* pLeft, const void* pRight) {
DEFAULT_COMP(GET_INT64_VAL(pLeft), GET_DOUBLE_VAL(pRight));
}
static int32_t compareDoubleVal(const void* pLeft, const void* pRight) {
DEFAULT_COMP(GET_DOUBLE_VAL(pLeft), GET_DOUBLE_VAL(pRight));
}
static int32_t compareDoubleIntVal(const void* pLeft, const void* pRight) {
double ret = (*(double*)pLeft) - (*(int64_t*)pRight);
if (fabs(ret) < DBL_EPSILON) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
}
static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
const char* pattern = pRight;
const char* str = pLeft;
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
const wchar_t* pattern = pRight;
const wchar_t* str = pLeft;
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
static __compar_fn_t getFilterComparator(int32_t type, int32_t filterType, int32_t optr) {
__compar_fn_t comparator = NULL;
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL: {
if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareIntVal;
} else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = compareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
comparator = compareDoubleIntVal;
} else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
comparator = compareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY: {
assert(filterType == TSDB_DATA_TYPE_BINARY);
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
comparator = compareStrPatternComp;
} else { /* normal relational comparator */
comparator = compareStrVal;
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
assert(filterType == TSDB_DATA_TYPE_NCHAR);
if (optr == TSDB_RELATION_LIKE) {
comparator = compareWStrPatternComp;
} else {
comparator = compareWStrVal;
}
break;
}
default:
comparator = compareIntVal;
break;
}
return comparator;
}
static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pSchema, int32_t* index,
int32_t* offset) {
*index = 0;
*offset = 0;
// filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
*index = TSDB_TBNAME_COLUMN_INDEX;
*offset = TSDB_TBNAME_COLUMN_INDEX;
return;
}
while ((*index) < pSupporter->numOfTags) {
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
pSupporter->pTagSchema[*index].type == pSchema->type &&
strcmp(pSupporter->pTagSchema[*index].name, pSchema->name) == 0) {
break;
} else {
(*offset) += pSupporter->pTagSchema[(*index)++].bytes;
}
}
}
void filterPrepare(void* expr, void* param) {
tSQLBinaryExpr *pExpr = (tSQLBinaryExpr*) expr;
if (pExpr->info != NULL) {
return;
}
int32_t i = 0, offset = 0;
pExpr->info = calloc(1, sizeof(tQueryInfo));
tQueryInfo* pInfo = pExpr->info;
SSyntaxTreeFilterSupporter* pSupporter = (SSyntaxTreeFilterSupporter*)param;
tVariant* pCond = pExpr->pRight->pVal;
SSchema* pSchema = pExpr->pLeft->pSchema;
getTagColumnInfo(pSupporter, pSchema, &i, &offset);
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
pInfo->sch = *pSchema;
pInfo->colIdx = i;
pInfo->optr = pExpr->nSQLBinaryOptr;
pInfo->offset = offset;
pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr);
tVariantAssign(&pInfo->q, pCond);
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
}
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char buf[TSDB_MAX_TAGS_LEN] = {0};
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
int8_t type = pInfo->sch.type;
int32_t ret = 0;
if (pInfo->q.nType == TSDB_DATA_TYPE_BINARY || pInfo->q.nType == TSDB_DATA_TYPE_NCHAR) {
ret = pInfo->compare(val, pInfo->q.pz);
} else {
tVariant t = {0};
tVariantCreateFromBinary(&t, val, (uint32_t) pInfo->sch.bytes, type);
ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key);
}
switch (pInfo->optr) {
case TSDB_RELATION_EQUAL: {
return ret == 0;
}
case TSDB_RELATION_NOT_EQUAL: {
return ret != 0;
}
case TSDB_RELATION_LARGE_EQUAL: {
return ret >= 0;
}
case TSDB_RELATION_LARGE: {
return ret > 0;
}
case TSDB_RELATION_LESS_EQUAL: {
return ret <= 0;
}
case TSDB_RELATION_LESS: {
return ret < 0;
}
case TSDB_RELATION_LIKE: {
return ret == 0;
}
default:
assert(false);
}
return true;
}
static int32_t mgmtFilterMeterByIndex(STable* pSTable, SArray* pRes, const char* pCond) {
STColumn* stcol = schemaColAt(pSTable->tagSchema, 0);
tSQLBinaryExpr* pExpr = NULL;
tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond));
// failed to build expression, no result, return immediately
if (pExpr == NULL) {
mError("table:%" PRIu64 ", no result returned, error in super table query expression:%s", pSTable->tableId.uid, pCond);
tfree(pCond);
return TSDB_CODE_OPS_NOT_SUPPORT;
}
// query according to the binary expression
SSyntaxTreeFilterSupporter s = {.pTagSchema = stcol, .numOfTags = schemaNCols(pSTable->tagSchema)};
SBinaryFilterSupp supp = {.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback,
.setupInfoFn = (__do_filter_suppl_fn_t)filterPrepare,
.pExtInfo = &s};
tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp);
tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo);
tansformQueryResult(pRes);
return TSDB_CODE_SUCCESS;
}
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
// no condition, all tables created according to the stable will involved in querying
if (pTagCond == NULL || wcslen(pTagCond) == 0) {
return createTableIdArrayList(tsdb, uid);
} else {
char* str = convertTagQueryStr(pTagCond, len);
SArray* result = taosArrayInit(8, POINTER_BYTES);
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pSTable != NULL);
if (mgmtFilterMeterByIndex(pSTable, result, str) == TSDB_CODE_SUCCESS) {
return result;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册