提交 33dbf051 编写于 作者: H hjxilinx

[td-98] fix bugs for group by + tag filter query

上级 0d2b5ca6
......@@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->maxRetry = 1;
pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
......@@ -825,7 +825,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) {
if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
......
......@@ -774,8 +774,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
STscObj* pTscObj = pSql->pTscObj;
if (tscShouldFreeAsyncSqlObj(pSql)) {
if (pTscObj->pSql != pSql) {
tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else {
......
......@@ -1207,7 +1207,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
pPObj->res.code = -(pState->code);
pPObj->res.code = pState->code;
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
......@@ -1336,12 +1336,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// SVnodeSidList *vnodeInfo = 0;
// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
......@@ -1384,11 +1378,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
pthread_mutex_unlock(&trsupport->queryMutex);
} else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql);
}
pthread_mutex_unlock(&trsupport->queryMutex);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
......@@ -1479,10 +1473,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
}
}
......
......@@ -47,7 +47,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
*
*/
typedef struct tQueryInfo {
int32_t offset; // offset value in tags
int32_t colIndex; // index of column in schema
uint8_t optr; // expression operator
SSchema sch; // schema of tags
......
......@@ -544,7 +544,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC);
}
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type);
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0);
if (pCond->start != NULL) {
int32_t optr = pCond->start->optr;
......@@ -569,7 +569,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
assert(ret <= 0);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
......@@ -595,8 +595,8 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
assert(ret >= 0);
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->end->v.i64Key);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
......
......@@ -5432,8 +5432,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->orderType = htons(pQueryMsg->orderType);
pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols;
}
pQueryMsg->interpoType = htons(pQueryMsg->interpoType);
......
......@@ -18,6 +18,7 @@
#include "talgo.h"
#include "tlog.h"
#include "tutil.h"
#include "tcompare.h"
#include "../../../query/inc/qast.h"
#include "../../../query/inc/qextbuffer.h"
......@@ -1170,7 +1171,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) {
STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &pTable->tableId);
taosArrayPush(pRes, &pTable);
}
}
......@@ -1184,24 +1185,20 @@ static void destroyHelper(void* param) {
free(param);
}
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) {
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) {
*index = 0;
*offset = 0;
// filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
*index = TSDB_TBNAME_COLUMN_INDEX;
*offset = TSDB_TBNAME_COLUMN_INDEX;
return;
}
while ((*index) < pSupporter->numOfTags) {
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
pSupporter->pTagSchema[*index].type == pSchema->type &&
pSupporter->pTagSchema[*index].type == pSchema->type &&
pSupporter->pTagSchema[*index].colId == pSchema->colId) {
break;
} else {
(*offset) += pSupporter->pTagSchema[(*index)++].bytes;
}
}
}
......@@ -1222,15 +1219,14 @@ void filterPrepare(void* expr, void* param) {
tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
getTagColumnInfo(pSupporter, pSchema, &i, &offset);
getTagColumnInfo(pSupporter, pSchema, &i);
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
pInfo->sch = *pSchema;
pInfo->sch = *pSchema;
pInfo->colIndex = i;
pInfo->optr = pExpr->_node.optr;
pInfo->offset = offset;
// pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr);
pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
tVariantAssign(&pInfo->q, pCond);
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
......@@ -1329,6 +1325,8 @@ void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables,
taosArrayPush(g, &p1);
}
}
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
......@@ -1380,7 +1378,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
int8_t type = pInfo->sch.type;
......@@ -1437,7 +1435,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)};
SBinaryFilterSupp supp = {
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s};
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s,
};
SArray* pTableList = taosArrayInit(8, POINTER_BYTES);
......
......@@ -34,31 +34,13 @@ typedef struct SPatternCompareInfo {
char matchOne; // symbol for match one wildcard, default: '_'
} SPatternCompareInfo;
int32_t compareInt32Val(const void *pLeft, const void *pRight);
int32_t compareInt64Val(const void *pLeft, const void *pRight);
int32_t compareInt16Val(const void *pLeft, const void *pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight);
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight);
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight);
int32_t compareDoubleVal(const void *pLeft, const void *pRight);
int32_t compareStrVal(const void *pLeft, const void *pRight);
int32_t compareWStrVal(const void *pLeft, const void *pRight);
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -40,25 +40,23 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
}
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
// int64_t lhs = ((SSkipListKey *)pLeft)->i64Key;
// double rhs = ((SSkipListKey *)pRight)->dKey;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
int64_t lhs = GET_INT64_VAL(pLeft);
double rhs = GET_DOUBLE_VAL(pRight);
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
// double lhs = ((SSkipListKey *)pLeft)->dKey;
// int64_t rhs = ((SSkipListKey *)pRight)->i64Key;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
double lhs = GET_DOUBLE_VAL(pLeft);
int64_t rhs = GET_INT64_VAL(pRight);
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
......@@ -241,7 +239,8 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void*
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
// todo promote the type definition before the comparsion
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr) {
__compar_fn_t comparFn = NULL;
switch (type) {
......@@ -250,10 +249,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
// assert(type == filterDataType);
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
comparFn = compareInt64Val;
break;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -265,22 +269,37 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareDoubleIntVal;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparFn = compareDoubleIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
comparFn = compareStrVal;
case TSDB_DATA_TYPE_BINARY: {
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
comparFn = compareStrPatternComp;
} else { /* normal relational comparFn */
comparFn = compareStrVal;
}
break;
case TSDB_DATA_TYPE_NCHAR:
comparFn = compareWStrVal;
}
case TSDB_DATA_TYPE_NCHAR: {
assert(filterDataType == TSDB_DATA_TYPE_NCHAR);
if (optr == TSDB_RELATION_LIKE) {
comparFn = compareWStrPatternComp;
} else {
comparFn = compareWStrVal;
}
break;
}
default:
comparFn = compareInt32Val;
break;
......
......@@ -308,7 +308,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
pSkipList->state.queryCount++;
#endif
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType);
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType, 0);
int32_t ret = -1;
for (int32_t i = sLevel; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
......@@ -372,7 +372,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
int32_t ret = -1;
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type);
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type, 0);
SSkipListNode* pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册