diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index fa41ff8a3039d06a37d82df842201dcce265da10..8058d7f054b126fdc479c720ca0dc89d4766dff8 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -609,21 +609,21 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
return size;
}
-static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
+static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
- tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
+ tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
#endif
- STableIdInfo *pTableMetaInfo = (STableIdInfo *)pMsg;
- pTableMetaInfo->sid = htonl(pTableMeta->sid);
- pTableMetaInfo->uid = htobe64(pTableMeta->uid);
- pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
+ STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
+ pTableIdInfo->sid = htonl(pTableMeta->sid);
+ pTableIdInfo->uid = htobe64(pTableMeta->uid);
+ pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
@@ -676,6 +676,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
+ pQueryMsg->vgId = htonl(pTableMeta->vgid);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) {
@@ -693,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
- pQueryMsg->vnode = htons(vnodeId);
+ pQueryMsg->vgId = htons(vnodeId);
}
pQueryMsg->numOfTables = htonl(numOfTables);
@@ -761,14 +762,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
- if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
- pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
- tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
- htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
- pColSchema->name);
-
- return -1; // 0 means build msg failed
- }
+// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
+// pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
+// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
+// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
+// pColSchema->name);
+//
+// return -1; // 0 means build msg failed
+// }
pQueryMsg->colList[i].colId = htons(pColSchema->colId);
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
@@ -862,7 +863,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
- pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg);
+ pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
@@ -943,7 +944,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
-
+
+ pQueryMsg->contLen = htonl(msgLen);
+
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c
index 8e6d20dba67c72007ed5fcc6dc0f4dd919b94249..b511a6bf088fef84b88adf16347a56ff022ce8a4 100644
--- a/src/dnode/src/dnodeRead.c
+++ b/src/dnode/src/dnodeRead.c
@@ -229,21 +229,23 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
- int32_t ret = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
+ int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
- dTrace("query msg is disposed, qInfo:%p", pQueryTableMsg);
-
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
- pRsp->code = 0;
+ pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
- .code = 0,
+ .code = code,
.msgType = 0
};
+
+ // do execute query
+ qTableQuery(pQInfo);
+
rpcSendResponse(&rpcRsp);
}
@@ -252,21 +254,51 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
void *pQInfo = htobe64(pRetrieve->qhandle);
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
-
- assert(pQInfo != NULL);
- int32_t contLen = 100;
- SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
- pRsp->numOfRows = 0;
- pRsp->precision = 0;
- pRsp->offset = 0;
- pRsp->useconds = 0;
-
- SRpcMsg rpcRsp = {
- .handle = pMsg->rpcMsg.handle,
- .pCont = pRsp,
- .contLen = contLen,
- .code = 0,
- .msgType = 0
- };
+
+ int32_t rowSize = 0;
+ int32_t numOfRows = 0;
+ int32_t contLen = 0;
+
+ SRpcMsg rpcRsp = {0};
+
+ int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
+ if (code != TSDB_CODE_SUCCESS) {
+ contLen = sizeof(SRetrieveTableRsp);
+
+ SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
+ pRsp->numOfRows = 0;
+ pRsp->precision = 0;
+ pRsp->offset = 0;
+ pRsp->useconds = 0;
+
+ rpcRsp = (SRpcMsg) {
+ .handle = pMsg->rpcMsg.handle,
+ .pCont = pRsp,
+ .contLen = contLen,
+ .code = code,
+ .msgType = 0
+ };
+
+ //todo free qinfo
+ } else {
+ contLen = 100;
+
+ SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
+ pRsp->numOfRows = 0;
+ pRsp->precision = 0;
+ pRsp->offset = 0;
+ pRsp->useconds = 0;
+
+ *(int64_t*) pRsp->data = 1000;
+
+ rpcRsp = (SRpcMsg) {
+ .handle = pMsg->rpcMsg.handle,
+ .pCont = pRsp,
+ .contLen = contLen,
+ .code = code,
+ .msgType = 0
+ };
+ }
+
rpcSendResponse(&rpcRsp);
}
diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h
index 330e303f60db99dc93477a2b81c455ba85af7b91..d8a052c161697746f2d7acce9597d663d9002211 100644
--- a/src/inc/taosmsg.h
+++ b/src/inc/taosmsg.h
@@ -451,10 +451,10 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
- int16_t vnode;
+ int32_t contLen; // msg header
+ int16_t vgId;
+
int32_t numOfTables;
- uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
-
uint64_t uid;
STimeWindow window;
diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt
index 0984aeb585df2f00d23853c6c88498e8ca14b29a..0e51962f49999ae55c1c18aa5acd9d9baba64be7 100644
--- a/src/query/CMakeLists.txt
+++ b/src/query/CMakeLists.txt
@@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC})
- TARGET_LINK_LIBRARIES(query tutil m rt)
+ TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF ()
\ No newline at end of file
diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qextbuffer.h
index 32df93d1e51693bbc900ece5e3c6f334aa8b97fb..598b809d92156c6fea1bcacd5dabafb89d4ee490 100644
--- a/src/query/inc/qextbuffer.h
+++ b/src/query/inc/qextbuffer.h
@@ -124,9 +124,8 @@ typedef struct tTagSchema {
typedef struct tSidSet {
int32_t numOfSids;
int32_t numOfSubSet;
- STableIdInfo **pSids;
+ STableIdInfo **pTableIdList;
int32_t * starterPos; // position of each subgroup, generated according to
-
SColumnModel *pColumnModel;
SColumnOrderInfo orderIdx;
} tSidSet;
diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h
index 6ff20affb00df71da3070badf5b24c57914f66b2..8956fb52b13e557ec5f1d36a7210cacff72a35e8 100644
--- a/src/query/inc/queryExecutor.h
+++ b/src/query/inc/queryExecutor.h
@@ -25,6 +25,7 @@
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
+#include "tarray.h"
typedef struct SData {
int32_t num;
@@ -39,7 +40,7 @@ enum {
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
-typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
+typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
@@ -142,7 +143,7 @@ typedef struct SQuery {
int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data
SData** sdata;
-
+ int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
@@ -152,7 +153,6 @@ typedef struct SQueryCostSummary {
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
-// void* pTabObj;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
@@ -174,16 +174,17 @@ typedef struct SQInfo {
TSKEY startTime;
int64_t elapsedTime;
SResultRec rec;
- int pointsReturned;
- int pointsInterpo;
- int code; // error code to returned to client
+ int32_t pointsReturned;
+ int32_t pointsInterpo;
+ int32_t code; // error code to returned to client
+ int32_t killed; // denotes if current query is killed
sem_t dataReady;
- SHashObj* pTableList; // table list
+ SArray* pTableIdList; // table list
SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
- tSidSet* pSidSet;
-
+// tSidSet* pSidSet;
+
T_REF_DECLARE()
/*
* the query is executed position on which meter of the whole list.
@@ -210,7 +211,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
* query on single table
* @param pReadMsg
*/
-void qTableQuery(void* pReadMsg);
+void qTableQuery(SQInfo* pQInfo);
/**
* query on super table
@@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg);
*/
void qSuperTableQuery(void* pReadMsg);
+/**
+ * wait for the query completed, and retrieve final results to client
+ * @param pQInfo
+ */
+int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
+
+
+//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf);
+
#endif // TDENGINE_QUERYEXECUTOR_H
diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c
index 8672fb9c5e8ba19d0b046fb8311771906a98d920..0a5abccdd576710f69419298bc619a806efab143 100644
--- a/src/query/src/queryExecutor.c
+++ b/src/query/src/queryExecutor.c
@@ -12,18 +12,19 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include
#include "os.h"
+#include "taosmsg.h"
#include "hash.h"
#include "hashfunc.h"
-#include "taosmsg.h"
#include "tlog.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "tstatus.h"
#include "ttime.h"
+#include "qast.h"
+
#include "qresultBuf.h"
#include "queryExecutor.h"
#include "queryUtil.h"
@@ -52,9 +53,9 @@
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
- ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes)
+ ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
- ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
+ ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type)
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
@@ -1505,9 +1506,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
} else {
- assert(0);
- // pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
- // pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
+ pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
+ pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
}
pCtx->ptsOutputBuf = NULL;
@@ -1558,9 +1558,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
}
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
-
- // for loading block data in memory
- // assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
return TSDB_CODE_SUCCESS;
_error_clean:
@@ -1893,8 +1890,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
}
-
- assert(0);
+
// pQuery->pointsOffset = pQuery->pointsToRead;
}
@@ -2243,10 +2239,10 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
- size_t s = taosHashGetSize(pQInfo->pTableList);
+ size_t s = taosArrayGetSize(pQInfo->pTableIdList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
- num = pQInfo->pSidSet->numOfSubSet;
+// num = pQInfo->pSidSet->numOfSubSet;
}
assert(num > 0);
@@ -2290,16 +2286,11 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
- if (pQInfo->pTableList != NULL) {
- taosHashCleanup(pQInfo->pTableList);
- pQInfo->pTableList = NULL;
- }
-
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) {
- size_t num = taosHashGetSize(pQInfo->pTableList);
- for (int32_t j = 0; j < num; ++j) {
+// size_t num = taosHashGetSize(pQInfo->pTableIdList);
+ for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
@@ -2337,11 +2328,11 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->lastKey = pQuery->window.skey;
// create runtime environment
- SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
+// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// get one queried meter
assert(0);
- // SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid);
+ // SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
@@ -2388,7 +2379,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
- // SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
+ // SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// }
@@ -2418,7 +2409,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
- assert(taosHashGetSize(pQInfo->pTableList) >= 1);
+// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1);
}
#if 0
@@ -2429,7 +2420,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) {
- SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
+ SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
@@ -2592,15 +2583,72 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
return pDataBlock;
}
+int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
+ int firstPos, lastPos, midPos = -1;
+ int numOfPoints;
+ TSKEY *keyList;
+
+ if (num <= 0) return -1;
+
+ keyList = (TSKEY *)pValue;
+ firstPos = 0;
+ lastPos = num - 1;
+
+ if (order == 0) {
+ // find the first position which is smaller than the key
+ while (1) {
+ if (key >= keyList[lastPos]) return lastPos;
+ if (key == keyList[firstPos]) return firstPos;
+ if (key < keyList[firstPos]) return firstPos - 1;
+
+ numOfPoints = lastPos - firstPos + 1;
+ midPos = (numOfPoints >> 1) + firstPos;
+
+ if (key < keyList[midPos]) {
+ lastPos = midPos - 1;
+ } else if (key > keyList[midPos]) {
+ firstPos = midPos + 1;
+ } else {
+ break;
+ }
+ }
+
+ } else {
+ // find the first position which is bigger than the key
+ while (1) {
+ if (key <= keyList[firstPos]) return firstPos;
+ if (key == keyList[lastPos]) return lastPos;
+
+ if (key > keyList[lastPos]) {
+ lastPos = lastPos + 1;
+ if (lastPos >= num)
+ return -1;
+ else
+ return lastPos;
+ }
+
+ numOfPoints = lastPos - firstPos + 1;
+ midPos = (numOfPoints >> 1) + firstPos;
+
+ if (key < keyList[midPos]) {
+ lastPos = midPos - 1;
+ } else if (key > keyList[midPos]) {
+ firstPos = midPos + 1;
+ } else {
+ break;
+ }
+ }
+ }
+
+ return midPos;
+}
+
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
-#if 0
SQuery *pQuery = pRuntimeEnv->pQuery;
- assert(0);
-// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pTabObj->searchAlgorithm];
-
+
int64_t cnt = 0;
- dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pQuery),
- pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
+ dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
+ GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
@@ -2619,17 +2667,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
-// doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey,
-// pQueryHandle->window.ekey, &skey1, &ekey1, &w);
+ doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime;
-// doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQueryHandle->window.ekey,
-// blockInfo.window.ekey, &skey1, &ekey1, &w);
+ doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w);
-// pWindowResInfo->startTime = pQueryHandle->window.skey;
+ pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
}
@@ -2638,20 +2684,17 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
-// int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, searchFn, &numOfRes,
-// &pRuntimeEnv->windowResInfo, pDataBlock);
+ int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
+ &pRuntimeEnv->windowResInfo, pDataBlock);
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d",
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot,
// pQuery->pos, blockInfo.size, forwardStep);
// save last access position
-// cnt += forwardStep;
-
-// if (queryPaused(pQuery, &blockInfo, forwardStep)) {
-// if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
-// break;
-// }
+ cnt += forwardStep;
+ if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
+ break;
}
}
@@ -2673,8 +2716,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
return cnt;
-#endif
- return 0;
}
static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; }
@@ -2935,29 +2976,29 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS;
- while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
- int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
- int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
-
- assert(0);
- // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
- if (ret < 0) { // not enough disk space to save the data into disk
- return -1;
- }
-
- pQInfo->subgroupIdx += 1;
-
- // this group generates at least one result, return results
- if (ret > 0) {
- break;
- }
-
- assert(pQInfo->numOfGroupResultPages == 0);
- dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
- }
-
- dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
- pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
+// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
+// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
+// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
+//
+// assert(0);
+// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
+// if (ret < 0) { // not enough disk space to save the data into disk
+// return -1;
+// }
+//
+// pQInfo->subgroupIdx += 1;
+//
+// // this group generates at least one result, return results
+// if (ret > 0) {
+// break;
+// }
+//
+// assert(pQInfo->numOfGroupResultPages == 0);
+// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
+// }
+//
+// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
+// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS;
}
@@ -2972,10 +3013,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// set current query completed
- if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
+// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
- return;
- }
+// return;
+// }
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
@@ -3282,7 +3323,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) {
}
if (isIntervalQuery(pQuery)) {
- size_t numOfTables = taosHashGetSize(pQInfo->pTableList);
+ size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
@@ -3320,7 +3361,6 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
- // int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
@@ -3339,8 +3379,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
- assert(0);
- // memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows);
+ memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity);
}
initCtxOutputBuf(pRuntimeEnv);
@@ -3884,7 +3923,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
- totalSubset = pQInfo->pSidSet->numOfSubSet;
+// totalSubset = pQInfo->pSidSet->numOfSubSet;
}
return totalSubset;
@@ -4275,7 +4314,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
- // pQInfo->over = 1;
+ pQInfo->killed = 1;
return TSDB_CODE_SUCCESS;
}
@@ -4298,10 +4337,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
taosArrayPush(cols, &pQuery->colList[i]);
}
- SArray* sa = taosArrayInit(1, sizeof(int16_t));
- taosArrayPush(sa, &pQInfo->pSidSet->pSids[0]->sid);
-
- pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, sa, cols);
+ pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
@@ -4588,7 +4624,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
#if 0
SQuery* pQuery = pRuntimeEnv->pQuery;
-// tSidSet *pSids = pSupporter->pSidSet;
+// tSidSet *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
@@ -4597,12 +4633,12 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
- while (pSupporter->subgroupIdx < pSids->numOfSubSet) {
- int32_t start = pSids->starterPos[pSupporter->subgroupIdx];
- int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1;
+ while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) {
+ int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx];
+ int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1;
if (isFirstLastRowQuery(pQuery)) {
- dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
+ dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
TSKEY key = -1;
@@ -4635,7 +4671,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
assert(num >= 0);
} else {
- dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
+ dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
for (int32_t k = start; k <= end; ++k) {
@@ -4680,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
- if (pSupporter->meterIdx >= pSids->numOfTables) {
+ if (pSupporter->meterIdx >= pTableIdList->numOfTables) {
return;
}
@@ -4833,7 +4869,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
- pQInfo, vid, pSids->numOfTables, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
+ pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif
}
@@ -4878,7 +4914,7 @@ static void doOrderedScan(SQInfo *pQInfo) {
static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
- int32_t num = taosHashGetSize(pQInfo->pTableList);
+ int32_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < num; ++i) {
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
@@ -5017,7 +5053,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
-static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
+static void tableFixedOutputProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
@@ -5044,7 +5080,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
pQInfo->rec.pointsRead = pQuery->rec.pointsRead;
}
-static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) {
+static void tableMultiOutputProcessor(SQInfo *pQInfo) {
#if 0
SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj;
@@ -5148,7 +5184,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
}
/* handle time interval query on single table */
-static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
+static void tableIntervalProcessor(SQInfo *pQInfo) {
// STable *pMeterObj = pQInfo->pObj;
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
@@ -5212,31 +5248,18 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
-void qTableQuery(void *pReadMsg) {
- // SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle;
-
-#if 0
- if (pQInfo == NULL) {
- dTrace("%p freed abort query", pQInfo);
+void qTableQuery(SQInfo* pQInfo) {
+ assert(pQInfo != NULL);
+
+ if (pQInfo->killed) {
+ dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
-// if (pQInfo->killed) {
-// dTrace("QInfo:%p it is already killed, abort", pQInfo);
-// vnodeDecRefCount(pQInfo);
-//
-// return;
-// }
-
-// assert(pQInfo->refCount >= 1);
-
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
- SQuery * pQuery = &pRuntimeEnv->pQuery;
-
-// assert(pRuntimeEnv->pMeterObj == pMeterObj);
+ SQuery *pQuery = pRuntimeEnv->pQuery;
-// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid,
-// pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo);
+// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo);
if (vnodeHasRemainResults(pQInfo)) {
/*
@@ -5254,15 +5277,9 @@ void qTableQuery(void *pReadMsg) {
pQInfo->pointsInterpo += numOfInterpo;
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
-// dTrace(
-// "QInfo:%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d "
-// "totalReturn:%d",
-// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
-// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
-
+// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
+// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
-// vnodeDecRefCount(pQInfo);
-
return;
}
@@ -5287,26 +5304,20 @@ void qTableQuery(void *pReadMsg) {
// pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
-// vnodeDecRefCount(pQInfo);
-
return;
}
}
}
- assert(0);
-// pQInfo->over = 1;
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid,
// pMeterObj->meterId, pQInfo->pointsRead);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady);
-
-// vnodeDecRefCount(pQInfo);
return;
}
- /* number of points returned during this query */
+ // number of points returned during this query
pQuery->rec.pointsRead = 0;
int64_t st = taosGetTimestampUs();
@@ -5314,14 +5325,15 @@ void qTableQuery(void *pReadMsg) {
// group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
- vnodeSingleTableIntervalProcessor(pQInfo);
+ tableIntervalProcessor(pQInfo);
} else {
if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBufferInLoop == 0);
- vnodeSingleTableFixedOutputProcessor(pQInfo);
+
+ tableFixedOutputProcessor(pQInfo);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBufferInLoop == 1);
- vnodeSingleTableMultiOutputProcessor(pQInfo);
+ tableMultiOutputProcessor(pQInfo);
}
}
@@ -5339,7 +5351,6 @@ void qTableQuery(void *pReadMsg) {
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
-#endif
}
void qSuperTableQuery(void *pReadMsg) {
@@ -5449,17 +5460,12 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
return 0;
}
-int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
- pQueryTableMsg->vnode = htons(pQueryTableMsg->vnode);
+static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) {
+ pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId);
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
-
-#ifdef TSKEY32
- pQueryTableMsg->skey = htonl(pQueryTableMsg->skey);
- pQueryTableMsg->ekey = htonl(pQueryTableMsg->ekey);
-#else
+
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey);
-#endif
pQueryTableMsg->order = htons(pQueryTableMsg->order);
pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId);
@@ -5477,17 +5483,17 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit);
pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset);
+
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks);
pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder);
-
+
// query msg safety check
if (validateQueryMeterMsg(pQueryTableMsg) != 0) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
- // STableIdInfo **pSids = NULL;
char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols;
for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) {
@@ -5584,25 +5590,28 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
pQueryTableMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryTableMsg->colNameLen;
}
-
- STableIdInfo **pSids = (STableIdInfo **)calloc(pQueryTableMsg->numOfTables, sizeof(STableIdInfo *));
- pQueryTableMsg->pSidExtInfo = (uint64_t)pSids;
-
- pSids[0] = (STableIdInfo *)pMsg;
- pSids[0]->sid = htonl(pSids[0]->sid);
- pSids[0]->uid = htobe64(pSids[0]->uid);
- pSids[0]->key = htobe64(pSids[0]->key);
-
+
+ *pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo));
+
+ STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg;
+ pTableIdInfo->sid = htonl(pTableIdInfo->sid);
+ pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
+ pTableIdInfo->key = htobe64(pTableIdInfo->key);
+
+ taosArrayPush(*pTableIdList, pTableIdInfo);
+ pMsg += sizeof(STableIdInfo);
+
for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) {
- pSids[j] = (STableIdInfo *)((char *)pSids[j - 1] + sizeof(STableIdInfo) + pQueryTableMsg->tagLength);
- pSids[j]->sid = htonl(pSids[j]->sid);
- pSids[j]->uid = htobe64(pSids[j]->uid);
- pSids[j]->key = htobe64(pSids[j]->key);
+ pTableIdInfo = (STableIdInfo *)pMsg;
+
+ pTableIdInfo->sid = htonl(pTableIdInfo->sid);
+ pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
+ pTableIdInfo->key = htobe64(pTableIdInfo->key);
+
+ taosArrayPush(*pTableIdList, pTableIdInfo);
+ pMsg += sizeof(STableIdInfo);
}
- pMsg = (char *)pSids[pQueryTableMsg->numOfTables - 1];
- pMsg += sizeof(STableIdInfo) + pQueryTableMsg->tagLength;
-
if (pQueryTableMsg->numOfGroupCols > 0 || pQueryTableMsg->numOfTagsCols > 0) { // group by tag columns
pQueryTableMsg->pTagSchema = (uint64_t)pMsg;
SSchema *pTagSchema = (SSchema *)pQueryTableMsg->pTagSchema;
@@ -5698,9 +5707,10 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
return TSDB_CODE_SUCCESS;
}
-int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
+static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
*pSqlFuncExpr = NULL;
-
+ int32_t code = TSDB_CODE_SUCCESS;
+
SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols);
if (pExprs == NULL) {
tfree(pQueryMsg->pSqlFuncExprs);
@@ -5732,24 +5742,24 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr
bytes = pTagSchema[pColumnIndexExInfo->colIdx].bytes;
} else { // parse the arithmetic expression
- // if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
- // *code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
- //
- // if (*code != TSDB_CODE_SUCCESS) {
- // tfree(pExprs);
- // return NULL;
- // }
- //
- // type = TSDB_DATA_TYPE_DOUBLE;
- // bytes = tDataTypeDesc[type].nSize;
- // } else { // parse the normal column
- // int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
- // assert(j < pQueryMsg->numOfCols);
- //
- // SColumnInfo* pCol = &pQueryMsg->colList[j];
- // type = pCol->type;
- // bytes = pCol->bytes;
- // }
+ if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
+ code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
+
+ if (code != TSDB_CODE_SUCCESS) {
+ tfree(pExprs);
+ return code;
+ }
+
+ type = TSDB_DATA_TYPE_DOUBLE;
+ bytes = tDataTypeDesc[type].nSize;
+ } else { // parse the normal column
+ int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
+ assert(j < pQueryMsg->numOfCols);
+
+ SColumnInfo* pCol = &pQueryMsg->colList[j];
+ type = pCol->type;
+ bytes = pCol->bytes;
+ }
}
int32_t param = pExprs[i].pBase.arg[0].argValue.i64;
@@ -5792,7 +5802,7 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr
return TSDB_CODE_SUCCESS;
}
-SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) {
+static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
@@ -5815,7 +5825,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *co
return pGroupbyExpr;
}
-int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
+static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].info.numOfFilters > 0) {
pQuery->numOfFilterCols++;
@@ -5899,7 +5909,7 @@ int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
return TSDB_CODE_SUCCESS;
}
-static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs) {
+static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _clean_memory;
@@ -5934,6 +5944,21 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
if (pQuery->colList == NULL) {
goto _clean_memory;
}
+
+ for (int16_t i = 0; i < numOfCols; ++i) {
+ pQuery->colList[i].info = pQueryMsg->colList[i];
+// SColumnInfo *pColInfo = &pQuery->colList[i].data;
+// pColInfo->filters = NULL;
+// if (colList[i].numOfFilters > 0) {
+// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo));
+//
+// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) {
+// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]);
+// }
+// } else {
+// pQuery->colList[i].data.filters = NULL;
+// }
+ }
// calculate the result row size
for (int16_t col = 0; col < numOfOutputCols; ++col) {
@@ -5952,13 +5977,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
goto _clean_memory;
}
+ // set the output buffer capacity
+ pQuery->capacity = 4096;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
// allocate additional memory for interResults that are usually larger then final results
- // size_t size = (pQInfo->query.pointsToRead + 1) * pExprs[col].resBytes + pExprs[col].interResBytes +
- // sizeof(SData);
- size_t size = 1000;
+ size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
pQuery->sdata[col] = (SData *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _clean_memory;
@@ -5977,10 +6002,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure
pQInfo->signature = (uint64_t)pQInfo;
+ pQInfo->pTableIdList = pTableIdList;
+
pQuery->pos = -1;
-
- // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
- // pQInfo);
+ // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo);
return pQInfo;
@@ -6005,28 +6030,107 @@ _clean_memory:
return NULL;
}
-int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
- SQInfo **pQInfo) {
- SQuery *pQuery;
+bool isQInfoValid(void *param) {
+ SQInfo *pQInfo = (SQInfo *)param;
+ if (pQInfo == NULL) {
+ return false;
+ }
+
+ /*
+ * pQInfo->signature may be changed by another thread, so we assign value of signature
+ * into local variable, then compare by using local variable
+ */
+ uint64_t sig = pQInfo->signature;
+ return (sig == (uint64_t)pQInfo);
+}
+
+void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) {
+ if (!isQInfoValid(pQInfo)) {
+ return;
+ }
+
+ pQInfo->killed = 1;
+ dTrace("QInfo:%p start to free SQInfo", pQInfo);
+
+ if (decQueryRef) {
+ vnodeDecMeterRefcnt(pQInfo);
+ }
+
+ SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
+
+ for (int col = 0; col < pQuery->numOfOutputCols; ++col) {
+ tfree(pQuery->sdata[col]);
+ }
+
+// for (int col = 0; col < pQuery->numOfCols; ++col) {
+// vnodeFreeColumnInfo(&pQuery->colList[col].data);
+// }
+//
+// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
+// tfree(pQuery->tsData);
+// }
+
+ sem_destroy(&(pQInfo->dataReady));
+ vnodeQueryFreeQInfoEx(pQInfo);
+
+ for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
+ SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
+ if (pColFilter->numOfFilters > 0) {
+ tfree(pColFilter->pFilters);
+ }
+ }
+
+ tfree(pQuery->pFilterInfo);
+ tfree(pQuery->colList);
+ tfree(pQuery->sdata);
+
+ if (pQuery->pSelectExpr != NULL) {
+ for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
+ SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
+
+ if (pBinExprInfo->numOfCols > 0) {
+ tfree(pBinExprInfo->pReqColumns);
+ tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL);
+ }
+ }
+
+ tfree(pQuery->pSelectExpr);
+ }
+
+ if (pQuery->defaultVal != NULL) {
+ tfree(pQuery->defaultVal);
+ }
+
+ tfree(pQuery->pGroupbyExpr);
+ tfree(pQuery);
+
+// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
+
+ //destroy signature, in order to avoid the query process pass the object safety check
+ memset(pQInfo, 0, sizeof(SQInfo));
+ tfree(pQInfo);
+}
+
+static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
+ SArray* pTableIdList, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
- (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs);
+ (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if (pQInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
- pQuery = (*pQInfo)->runtimeEnv.pQuery;
+ SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
- STableIdInfo **pSids = (STableIdInfo **)pQueryMsg->pSidExtInfo;
- if (pSids != NULL && pSids[0]->key > 0) {
- pQuery->window.skey = pSids[0]->key;
- } else {
- pQuery->window.skey = pQueryMsg->window.skey;
- }
-
+// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo;
+// if (pTableIdList != NULL && pTableIdList[0]->key > 0) {
+// pQuery->window.skey = pTableIdList[0]->key;
+// } else {
+ pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
+
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
@@ -6038,10 +6142,6 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS
vnodeParametersSafetyCheck(pQuery);
- (*pQInfo)->pTableList = taosHashInit(pQueryMsg->numOfTables, taosIntHash_32, false);
- // taosHashPut(pSupporter->pMetersHashTable, (const char *)&pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
- // (char *)&pMetersObj[0], POINTER_BYTES);
-
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
@@ -6062,13 +6162,11 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
-
- // taosScheduleTask(queryQhandle, &schedMsg);
return code;
_error:
// table query ref will be decrease during error handling
- // vnodeFreeQInfo(pQInfo, false);
+ vnodeFreeQInfo(*pQInfo, false);
return code;
}
@@ -6076,7 +6174,8 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
- if ((code = convertQueryTableMsg(pQueryTableMsg)) != TSDB_CODE_SUCCESS) {
+ SArray* pTableIdList = NULL;
+ if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) {
return code;
}
@@ -6087,7 +6186,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
}
// todo check vnode status
- if (pQueryTableMsg->pSidExtInfo == 0) {
+ if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
@@ -6106,10 +6205,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else {
- code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pQInfo);
+ code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo);
}
_query_over:
+ if (code != TSDB_CODE_SUCCESS) {
+ taosArrayDestroy(pTableIdList);
+ }
+
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
@@ -6126,4 +6229,34 @@ _query_over:
//
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return TSDB_CODE_SUCCESS;
-}
\ No newline at end of file
+}
+
+int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) {
+ if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
+ return TSDB_CODE_INVALID_QHANDLE;
+ }
+
+ if (pQInfo->killed) {
+ dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
+ if (pQInfo->code == TSDB_CODE_SUCCESS) {
+ return TSDB_CODE_QUERY_CANCELLED;
+ } else { // in case of not TSDB_CODE_SUCCESS, return the code to client
+ return abs(pQInfo->code);
+ }
+ }
+
+ sem_wait(&pQInfo->dataReady);
+
+ SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
+
+// *numOfRows = pQInfo->rec.pointsRead;
+// *rowsize = pQuery->rowSize;
+ *numOfRows = 1;
+
+// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec,
+// *rowsize, *numOfRows, pQInfo->code);
+
+ if (pQInfo->code < 0) { // less than 0 means there are error existed.
+ return -pQInfo->code;
+ }
+}
diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c
index 36593e719a81cd58e6bf758356a2f958373a6e42..a62299c45fa83b7e021e2e91863e10eea9126046 100644
--- a/src/vnode/tsdb/src/tsdbRead.c
+++ b/src/vnode/tsdb/src/tsdbRead.c
@@ -16,3 +16,51 @@
#include "os.h"
#include "tsdb.h"
+tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
+
+}
+
+bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
+ return false;
+}
+
+SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
+
+}
+
+int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
+
+}
+
+SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
+
+}
+
+int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
+
+}
+
+int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {
+
+}
+
+tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) {
+ return NULL;
+}
+
+SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {
+
+}
+
+tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {
+
+}
+
+STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {
+
+}
+
+STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {
+
+}
+