未验证 提交 945024bf 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1641 from taosdata/feature/query

Feature/query
......@@ -71,6 +71,7 @@ typedef struct STableMeta {
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SVgroupsInfo* vgroupList;
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
......
......@@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->maxRetry = 1;
pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
......@@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table
int32_t index = pTableMetaInfo->vgroupIndex;
if (pTableMetaInfo->vgroupIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vgroupIndex);
if (index < 0) {
tscError("%p error vgroupIndex:%d", pSql, index);
return -1;
}
pSql->ipList.numOfIps = 1; // todo fix me
SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
pSql->ipList.port = tsDnodeShellPort;
pSql->ipList.inUse = 0;
// todo extract method
SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[pTableMetaInfo->vgroupIndex];
pSql->ipList.ip[0] = pVgroupInfo->ipAddr[0].ip;
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
pSql->ipList.ip[i] = pVgroupInfo->ipAddr[i].ip;
}
#if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
......@@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
#endif
tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups,
pTableMetaInfo->vgroupIndex);
tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
numOfTables = 1;
......@@ -823,7 +825,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) {
if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
......@@ -2133,10 +2135,11 @@ _error_clean:
for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i];
pVgroups->numOfIps = htonl(pVgroups->numOfIps);
pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfIps >= 1);
for(int32_t j = 0; j < tListLen(pVgroups->ipAddr); ++j) {
for(int32_t j = 0; j < pVgroups->numOfIps; ++j) {
pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip);
pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
}
......
......@@ -690,7 +690,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem);
......@@ -773,8 +774,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
STscObj* pTscObj = pSql->pTscObj;
if (tscShouldFreeAsyncSqlObj(pSql)) {
if (pTscObj->pSql != pSql) {
tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else {
......
......@@ -1207,7 +1207,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
pPObj->res.code = -(pState->code);
pPObj->res.code = pState->code;
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
......@@ -1336,12 +1336,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// SVnodeSidList *vnodeInfo = 0;
// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
......@@ -1384,11 +1378,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
pthread_mutex_unlock(&trsupport->queryMutex);
} else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql);
}
pthread_mutex_unlock(&trsupport->queryMutex);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
......@@ -1479,10 +1473,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
}
}
......
......@@ -2128,27 +2128,23 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
// SSuperTableMeta* pMetricMeta = NULL;
// if (cmd == TSDB_SQL_SELECT) {
// pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
// }
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
// pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
// pTableMetaInfo->tagColumnIndex);
STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pPrevInfo->vgroupList = NULL;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
}
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
// assert(pFinalInfo->pMetricMeta != NULL);
assert(pFinalInfo->vgroupList != NULL);
}
if (cmd == TSDB_SQL_SELECT) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERY_H
#define TDENGINE_QUERY_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void* qinfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param pVnode
* @param pQueryTableMsg
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/**
* Destroy QInfo object
*
* @param qinfo
* @return
*/
void qDestroyQueryInfo(qinfo_t qinfo);
/**
* the main query execution function, including query on both table and multitables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @return
*/
void qTableQuery(qinfo_t qinfo);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param qinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
/**
*
* Retrieve the actual results to fill the response message payload.
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
*
* @param qinfo qinfo object
* @param pRsp response message
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
* Decide if more results will be produced or not
*
* @param qinfo
* @return
*/
bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERY_H
......@@ -92,6 +92,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(tsdb_repo_t *repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val);
int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
......@@ -158,11 +160,16 @@ typedef struct SDataBlockInfo {
int32_t sid;
} SDataBlockInfo;
typedef struct {
size_t numOfTables;
SArray* pGroupList;
} STableGroupInfo;
typedef struct {
} SFields;
#define TSDB_TS_GREATER_EQUAL 1
#define TSDB_TS_LESS_EQUAL 2
#define TSDB_TS_LESS_EQUAL 2
typedef struct SQueryRowCond {
int32_t rel;
......@@ -177,7 +184,7 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list
* @return
*/
tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t *tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, SArray *pColumnInfo);
/**
* move to next block
......@@ -279,10 +286,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition
*
*/
int32_t tsdbQueryTags(tsdb_repo_t *tsdb, int64_t uid, const char *pTagCond, size_t len, SArray **pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList,
SColIndex* pColIndex, int32_t numOfCols);
int32_t tsdbGetOneTableGroup(tsdb_repo_t *tsdb, int64_t uid, SArray **pGroupList);
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo);
/**
* clean up the query handle
......
......@@ -47,7 +47,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
*
*/
typedef struct tQueryInfo {
int32_t offset; // offset value in tags
int32_t colIndex; // index of column in schema
uint8_t optr; // expression operator
SSchema sch; // schema of tags
......
......@@ -18,6 +18,7 @@
#include "os.h"
#include "hash.h"
#include "tsdb.h"
#include "qinterpolation.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
......@@ -89,7 +90,7 @@ typedef struct SColumnFilterElem {
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
SColumnInfoData info;
SColumnInfoData info;
int32_t numOfFilters;
SColumnFilterElem* pFilters;
void* pData;
......@@ -108,8 +109,6 @@ typedef struct STableQueryInfo {
} STableQueryInfo;
typedef struct STableDataInfo {
int32_t numOfBlocks;
int32_t start; // start block index
int32_t tableIndex;
int32_t groupIdx; // group id in table list
STableQueryInfo* pTableQInfo;
......@@ -171,7 +170,7 @@ typedef struct SQInfo {
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;
SArray* pTableList; // table id list
STableGroupInfo groupInfo; // table id list
void* tsdb;
SQueryRuntimeEnv runtimeEnv;
......@@ -187,50 +186,7 @@ typedef struct SQInfo {
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
STableDataInfo* pTableDataInfo;
TSKEY* tsList;
} SQInfo;
/**
* create the qinfo object before adding the query task to each tsdb query worker
*
* @param pReadMsg
* @param pQInfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* destroy the query info struct
* @param pQInfo
*/
void qDestroyQueryInfo(SQInfo* pQInfo);
/**
* query on single table
* @param pReadMsg
*/
void qTableQuery(SQInfo* pQInfo);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
*/
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
/**
*
* @param pQInfo
* @param pRsp
* @return
*/
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
*
* @param pQInfo
* @return
*/
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -544,7 +544,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC);
}
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type);
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0);
if (pCond->start != NULL) {
int32_t optr = pCond->start->optr;
......@@ -569,7 +569,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
assert(ret <= 0);
assert(ret >= 0);
}
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
......@@ -595,8 +595,8 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
assert(ret >= 0);
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->end->v.i64Key);
assert(ret <= 0);
}
if (ret == 0 && optr == TSDB_RELATION_LESS) {
......
......@@ -27,7 +27,8 @@
#include "qresultBuf.h"
#include "queryExecutor.h"
#include "queryUtil.h"
#include "tsdb.h"
#include "query.h"
#include "tsdbMain.h" //todo use TableId instead of STable object
#define DEFAULT_INTERN_BUF_SIZE 16384L
......@@ -92,7 +93,7 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2,
};
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end);
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
......@@ -104,7 +105,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData,
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTableQueryInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
......@@ -2185,7 +2186,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
size_t s = taosArrayGetSize(pQInfo->pTableList);
size_t s = pQInfo->groupInfo.numOfTables;
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
num = 1;//pQInfo->pSidSet->numOfSubSet;
......@@ -2253,7 +2254,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
// get one queried meter
assert(0);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[0]->sid);
// SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
......@@ -2298,10 +2299,8 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
.colList = pQuery->colList,
};
SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid);
// SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// }
......@@ -2310,7 +2309,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
taosArrayPush(cols, &pQuery->colList[i]);
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, sa, cols);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols);
// metric query do not invoke interpolation, it will be done at the second-stage merge
if (!isPointInterpoQuery(pQuery)) {
......@@ -2331,18 +2330,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
// assert(taosHashGetSize(pQInfo->pTableList) >= 1);
// assert(taosHashGetSize(pQInfo->groupInfo) >= 1);
}
#if 0
if (pQInfo == NULL || pQInfo->numOfTables == 1) {
if (pQInfo == NULL || pQInfo->groupInfo.numOfTables == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid);
for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
......@@ -2356,9 +2355,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
*/
num = pQInfo->numOfTables - num;
num = pQInfo->groupInfo.numOfTables - num;
dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->numOfTables, num);
pQInfo->groupInfo.numOfTables, num);
}
#endif
}
......@@ -2663,32 +2662,27 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, void *pMeterSidInfo, tVariant *param) {
static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColIdx, tVariant *param) {
assert(tagColIdx >= 0);
#if 0
int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx);
void * pStr = (char *)pMeterSidInfo->tags + offset;
SSchema *pCol = getColumnModelSchema(pTagSchema, tagColIdx);
tVariantDestroy(param);
if (isNull(pStr, pCol->type)) {
param->nType = TSDB_DATA_TYPE_NULL;
} else {
tVariantCreateFromBinary(param, pStr, pCol->bytes, pCol->type);
}
#endif
char* val = NULL;
int16_t bytes = 0;
int16_t type = 0;
tsdbGetTableTagVal(tsdb, id, tagColIdx, &type, &bytes, &val);
tVariantCreateFromBinary(param, val, bytes, type);
}
void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) {
void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel;
// SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) {
......@@ -2699,14 +2693,14 @@ void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntim
continue;
}
doSetTagValueInParam(pTagSchema, pColEx->colIndex, pMeterSidInfo, &pRuntimeEnv->pCtx[idx].tag);
doSetTagValueInParam(tsdb, id, pColEx->colIndex, &pRuntimeEnv->pCtx[idx].tag);
}
// set the join tag for first column
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
}
}
}
......@@ -2915,10 +2909,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
int32_t end = taosArrayGetSize(pQInfo->pTableList) - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
// int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->groupInfo.numOfTables - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
ret = mergeIntoGroupResultImpl(pQInfo, pQInfo->pTableDataInfo, start, end);
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
}
......@@ -3015,22 +3010,26 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
return maxOutput;
}
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) {
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage ** buffer = (tFilePage **)pQuery->sdata;
int32_t * posList = calloc((end - start), sizeof(int32_t));
STableDataInfo **pTableList = malloc(POINTER_BYTES * (end - start));
size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = (tFilePage **)pQuery->sdata;
int32_t *posList = calloc(size, sizeof(int32_t));
STableDataInfo **pTableList = malloc(POINTER_BYTES * size);
// todo opt for the case of one table per group
int32_t numOfTables = 0;
for (int32_t i = start; i < end; ++i) {
int32_t tid = pTableDataInfo[i].pTableQInfo->tid;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid);
if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) {
pTableList[numOfTables] = &pTableDataInfo[i];
for (int32_t i = 0; i < size; ++i) {
SPair* p = taosArrayGet(pGroup, i);
STableQueryInfo* pInfo = p->sec;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->tid);
if (list.size > 0 && pInfo->windowResInfo.size > 0) {
// pTableList[numOfTables] = &pTableDataInfo[i];
numOfTables += 1;
}
}
......@@ -3258,14 +3257,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
}
if (isIntervalQuery(pQuery)) {
size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
//
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
// }
} else {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
......@@ -3557,7 +3554,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
if (pRuntimeEnv->pSecQueryHandle != NULL) {
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, pQInfo->pTableList, cols);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols);
}
taosArrayDestroy(cols);
......@@ -3654,7 +3651,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid
return pTableQueryInfo;
}
void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) {
UNUSED_FUNC void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) {
if (pTableQueryInfo == NULL) {
return;
}
......@@ -3700,7 +3697,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32_t meterIdx, int32_t groupIdx,
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable* pTable, int32_t groupIdx,
TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
......@@ -3726,7 +3723,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32
initCtxOutputBuf(pRuntimeEnv);
pTableQueryInfo->lastKey = nextKey;
setAdditionalInfo(pQInfo, meterIdx, pTableQueryInfo);
setAdditionalInfo(pQInfo, pTable, pTableQueryInfo);
}
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
......@@ -3754,11 +3751,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
}
}
int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTableQueryInfo) {
int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey > 0);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]);
vnodeSetTagValueInParam(&pQInfo->groupInfo, pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
......@@ -3866,7 +3863,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = 1;//pQInfo->pSidSet->numOfSubSet;
totalSubset = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
}
return totalSubset;
......@@ -4193,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
taosArrayPush(cols, &pQuery->colList[i]);
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, pQInfo->pTableList, cols);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols);
taosArrayDestroy(cols);
pQInfo->tsdb = tsdb;
......@@ -4294,7 +4291,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupList *pSidset) {
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupInfo *pSidset) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false;
}
......@@ -4335,7 +4332,6 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
int64_t st = taosGetTimestampMs();
size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
......@@ -4345,15 +4341,26 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
STableDataInfo* pTableDataInfo = NULL;
STable* pTable = NULL;
// todo opt performance
for(int32_t i = 0; i < numOfTables; ++i) {
if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) {
pTableDataInfo = &pQInfo->pTableDataInfo[i];
break;
// todo opt performance using hash table
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(group);
for(int32_t j = 0; j < num; ++j) {
SPair* p = taosArrayGet(group, j);
STableDataInfo* pInfo = p->sec;
if (pInfo->pTableQInfo->tid == blockInfo.sid) {
pTableDataInfo = p->sec;
pTable = p->first;
break;
}
}
}
assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL);
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
......@@ -4364,10 +4371,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
TSKEY nextKey = blockInfo.window.ekey;
if (!isIntervalQuery(pQuery)) {
setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx, nextKey);
setExecutionContext(pQInfo, pTableQueryInfo, pTable, pTableDataInfo->groupIdx, nextKey);
} else { // interval query
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, pTableDataInfo->tableIndex, pTableQueryInfo);
int32_t ret = setAdditionalInfo(pQInfo, pTable, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
......@@ -4493,7 +4500,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
#if 0
// STableGroupList *pTableIdList = pSupporter->pSidSet;
// STableGroupInfo *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
......@@ -4747,35 +4754,32 @@ static void createTableDataInfo(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
if (pQInfo->pTableDataInfo == NULL) {
pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables);
if (pQInfo->pTableDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
return;
}
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
int32_t index = 0;
for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info
SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i);
int32_t groupId = 0;
for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info
STableId *id = taosArrayGet(pQInfo->pTableList, i);
STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i];
size_t s = taosArrayGetSize(group);
for(int32_t j = 0; j < s; ++j) {
SPair* p = (SPair*) taosArrayGet(group, j);
STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo));
setTableDataInfo(pInfo, index, i);
pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window);
setTableDataInfo(pInfo, i, groupId);
pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, id->tid, pQuery->window);
p->sec = pInfo;
index += 1;
}
}
}
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
}
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
// }
}
static void doSaveContext(SQInfo* pQInfo) {
......@@ -4809,12 +4813,23 @@ static void doRestoreContext(SQInfo* pQInfo) {
static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
if (isIntervalQuery(pQuery)) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
// }
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(group);
for(int32_t j = 0; j < num; ++j) {
SPair* p = taosArrayGet(group, j);
STableDataInfo* pInfo = p->sec;
closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo);
}
}
} else { // close results for group result
closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
......@@ -5140,15 +5155,15 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
assert(taosArrayGetSize(pQInfo->pTableList) == 1);
assert(pQInfo->groupInfo.numOfTables == 1);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo);
} else {
STableId* pTableId = taosArrayGet(pQInfo->pTableList, 0);
dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0);
// dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady);
......@@ -5175,8 +5190,7 @@ static void stableQueryImpl(SQInfo* pQInfo) {
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
if (pQuery->rec.rows == 0) {
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total);
dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
}
......@@ -5410,8 +5424,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->orderType = htons(pQueryMsg->orderType);
pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols;
}
pQueryMsg->interpoType = htons(pQueryMsg->interpoType);
......@@ -5710,10 +5722,10 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) {
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
SArray *pTableList) {
STableGroupInfo *groupInfo) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _clean_pQInfo_memory;
return NULL;
}
SQuery *pQuery = calloc(1, sizeof(SQuery));
......@@ -5808,7 +5820,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->pTableList = pTableList;
pQInfo->groupInfo = *groupInfo;
pQuery->pos = -1;
......@@ -5842,7 +5854,6 @@ _clean_memory:
tfree(pExprs);
tfree(pGroupbyExpr);
_clean_pQInfo_memory:
tfree(pQInfo);
return NULL;
......@@ -5918,14 +5929,12 @@ static void freeQInfo(SQInfo *pQInfo) {
sem_destroy(&(pQInfo->dataReady));
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
tfree(pQInfo->pTableDataInfo);
// if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->groupInfo);
// for (int32_t j = 0; j < 0; ++j) {
// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
// }
// }
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
......@@ -5958,7 +5967,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
taosArrayDestroy(pQInfo->pTableList);
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
dTrace("QInfo:%p QInfo is freed", pQInfo);
......@@ -6023,7 +6032,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// todo if interpolation exists, the result may be dump to client by several rounds
}
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) {
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
assert(pQueryMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6061,7 +6070,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
}
bool isSTableQuery = false;
SArray* pGroupList = NULL;
STableGroupInfo* groupInfo = calloc(1, sizeof(STableGroupInfo));
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
isSTableQuery = true;
......@@ -6069,8 +6078,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
STableId* id = taosArrayGet(pTableIdList, 0);
id->uid = -1; //todo fix me
/*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &pGroupList, pGroupColIndex, pQueryMsg->numOfGroupCols);
if (taosArrayGetSize(pGroupList) == 0) { // no qualified tables no need to do query
/*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols);
if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _query_over;
}
......@@ -6078,12 +6087,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
assert(taosArrayGetSize(pTableIdList) == 1);
STableId* id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &pGroupList)) != TSDB_CODE_SUCCESS) {
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, groupInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pGroupList);
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, groupInfo);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -6113,12 +6122,14 @@ _query_over:
return TSDB_CODE_SUCCESS;
}
void qDestroyQueryInfo(SQInfo* pQInfo) {
void qDestroyQueryInfo(qinfo_t pQInfo) {
dTrace("QInfo:%p query completed", pQInfo);
freeQInfo(pQInfo);
}
void qTableQuery(SQInfo *pQInfo) {
void qTableQuery(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
return;
......@@ -6140,7 +6151,9 @@ void qTableQuery(SQInfo *pQInfo) {
// vnodeDecRefCount(pQInfo);
}
int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
......@@ -6158,7 +6171,9 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
return pQInfo->code;
}
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
return false;
}
......@@ -6175,7 +6190,9 @@ bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
}
}
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen) {
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen) {
SQInfo* pQInfo = (SQInfo*) qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
......
......@@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
}
}
int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id.uid);
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = schemaColAt(pSchema, col);
SDataRow row = (SDataRow)pTable->tagVal;
char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
*val = d;
*type = pCol->type;
*bytes = pCol->bytes;
return 0;
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1;
......
......@@ -18,11 +18,10 @@
#include "talgo.h"
#include "tlog.h"
#include "tutil.h"
#include "tcompare.h"
#include "../../../query/inc/qast.h"
#include "../../../query/inc/qextbuffer.h"
#include "../../../query/inc/tlosertree.h"
#include "../../../query/inc/tsqlfunction.h"
#include "../../../query/inc/qast.h" // todo move to common module
#include "../../../query/inc/tlosertree.h" // todo move to util module
#include "tsdb.h"
#include "tsdbMain.h"
......@@ -143,7 +142,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1;
}
tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* groupList, SArray* pColumnInfo) {
tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
......@@ -157,22 +156,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
pQueryHandle->isFirstSlot = true;
pQueryHandle->cur.fid = -1;
size_t size = taosArrayGetSize(groupList);
assert(size >= 1);
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1);
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo));
for (int32_t i = 0; i < size; ++i) {
SArray* group = *(SArray**)taosArrayGet(groupList, i);
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = *(STable**)taosArrayGet(group, j);
assert(pTable != NULL);
SPair* d = (SPair*) taosArrayGet(group, j);
assert(d->first != NULL);
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = pTable->tableId,
.pTableObj = pTable,
.tableId = ((STable*) d->first)->tableId,
.pTableObj = d->first,
};
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
......@@ -1143,7 +1145,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
SSkipListNode* pNode = tSkipListIterGet(iter);
STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
taosArrayPush(list, t);
taosArrayPush(list, &t);
}
return TSDB_CODE_SUCCESS;
......@@ -1167,7 +1169,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) {
STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &pTable->tableId);
taosArrayPush(pRes, &pTable);
}
}
......@@ -1181,24 +1183,20 @@ static void destroyHelper(void* param) {
free(param);
}
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) {
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) {
*index = 0;
*offset = 0;
// filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
*index = TSDB_TBNAME_COLUMN_INDEX;
*offset = TSDB_TBNAME_COLUMN_INDEX;
return;
}
while ((*index) < pSupporter->numOfTags) {
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
pSupporter->pTagSchema[*index].type == pSchema->type &&
pSupporter->pTagSchema[*index].type == pSchema->type &&
pSupporter->pTagSchema[*index].colId == pSchema->colId) {
break;
} else {
(*offset) += pSupporter->pTagSchema[(*index)++].bytes;
}
}
}
......@@ -1219,15 +1217,14 @@ void filterPrepare(void* expr, void* param) {
tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
getTagColumnInfo(pSupporter, pSchema, &i, &offset);
getTagColumnInfo(pSupporter, pSchema, &i);
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
pInfo->sch = *pSchema;
pInfo->sch = *pSchema;
pInfo->colIndex = i;
pInfo->optr = pExpr->_node.optr;
pInfo->offset = offset;
// pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr);
pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
tVariantAssign(&pInfo->q, pCond);
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
......@@ -1306,26 +1303,33 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
}
void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTables[0]);
SArray* g = taosArrayInit(16, sizeof(SPair));
SPair p = {.first = pTables[0]};
taosArrayPush(g, &p);
for (int32_t i = 1; i < numOfTables; ++i) {
int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp);
assert(ret == 0 || ret == -1);
if (ret == 0) {
taosArrayPush(g, &pTables[i]);
SPair p1 = {.first = pTables[i]};
taosArrayPush(g, &p1);
} else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTables[i]);
SPair p1 = {.first = pTables[i]};
taosArrayPush(g, &p1);
}
}
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL && taosArrayGetSize(pTableList) > 0);
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
size_t size = taosArrayGetSize(pTableList);
......@@ -1335,7 +1339,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
taosArrayPush(pTableGroup, pTableList);
size_t num = taosArrayGetSize(pTableList);
SArray* sa = taosArrayInit(num, sizeof(SPair));
for(int32_t i = 0; i < num; ++i) {
STable* pTable = taosArrayGetP(pTableList, i);
SPair p = {.first = pTable};
taosArrayPush(sa, &p);
}
taosArrayPush(pTableGroup, &sa);
pTrace("all %d tables belong to one group", size);
#ifdef _DEBUG_VIEW
......@@ -1362,7 +1376,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
int8_t type = pInfo->sch.type;
......@@ -1419,7 +1433,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)};
SBinaryFilterSupp supp = {
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s};
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s,
};
SArray* pTableList = taosArrayInit(8, POINTER_BYTES);
......@@ -1430,7 +1445,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS;
}
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList,
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
......@@ -1448,9 +1463,9 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
taosArrayDestroy(res);
return ret;
}
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
return ret;
}
......@@ -1465,25 +1480,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
}
doQueryTableList(pSTable, res, pExprNode);
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret;
}
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList) {
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID;
}
//todo assert table type, add the table ref count
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
*pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable);
taosArrayPush(*pGroupList, &group);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
}
......
......@@ -34,31 +34,13 @@ typedef struct SPatternCompareInfo {
char matchOne; // symbol for match one wildcard, default: '_'
} SPatternCompareInfo;
int32_t compareInt32Val(const void *pLeft, const void *pRight);
int32_t compareInt64Val(const void *pLeft, const void *pRight);
int32_t compareInt16Val(const void *pLeft, const void *pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight);
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight);
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight);
int32_t compareDoubleVal(const void *pLeft, const void *pRight);
int32_t compareStrVal(const void *pLeft, const void *pRight);
int32_t compareWStrVal(const void *pLeft, const void *pRight);
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -107,6 +107,11 @@ extern "C" {
#define POW2(x) ((x) * (x))
typedef struct SPair {
void* first;
void* sec;
} SPair;
int32_t strdequote(char *src);
void strtrim(char *src);
......
......@@ -40,25 +40,23 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
}
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
// int64_t lhs = ((SSkipListKey *)pLeft)->i64Key;
// double rhs = ((SSkipListKey *)pRight)->dKey;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
int64_t lhs = GET_INT64_VAL(pLeft);
double rhs = GET_DOUBLE_VAL(pRight);
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
// double lhs = ((SSkipListKey *)pLeft)->dKey;
// int64_t rhs = ((SSkipListKey *)pRight)->i64Key;
// if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0;
// } else {
// return (lhs > rhs) ? 1 : -1;
// }
return 0;
double lhs = GET_DOUBLE_VAL(pLeft);
int64_t rhs = GET_INT64_VAL(pRight);
if (fabs(lhs - rhs) < FLT_EPSILON) {
return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
......@@ -241,7 +239,8 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void*
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
// todo promote the type definition before the comparsion
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr) {
__compar_fn_t comparFn = NULL;
switch (type) {
......@@ -250,10 +249,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
// assert(type == filterDataType);
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
comparFn = compareInt64Val;
break;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareIntDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -265,22 +269,37 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareDoubleIntVal;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparFn = compareDoubleIntVal;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareDoubleVal;
}
break;
}
case TSDB_DATA_TYPE_BINARY:
comparFn = compareStrVal;
case TSDB_DATA_TYPE_BINARY: {
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
comparFn = compareStrPatternComp;
} else { /* normal relational comparFn */
comparFn = compareStrVal;
}
break;
case TSDB_DATA_TYPE_NCHAR:
comparFn = compareWStrVal;
}
case TSDB_DATA_TYPE_NCHAR: {
assert(filterDataType == TSDB_DATA_TYPE_NCHAR);
if (optr == TSDB_RELATION_LIKE) {
comparFn = compareWStrPatternComp;
} else {
comparFn = compareWStrVal;
}
break;
}
default:
comparFn = compareInt32Val;
break;
......
......@@ -308,7 +308,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
pSkipList->state.queryCount++;
#endif
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType);
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType, 0);
int32_t ret = -1;
for (int32_t i = sLevel; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
......@@ -372,7 +372,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
int32_t ret = -1;
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type);
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type, 0);
SSkipListNode* pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
......
......@@ -25,7 +25,7 @@
#include "dataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "queryExecutor.h"
#include "query.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
......@@ -54,7 +54,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
int32_t code = TSDB_CODE_SUCCESS;
SQInfo* pQInfo = NULL;
qinfo_t pQInfo = NULL;
if (contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册