diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h
index d38885ab2ee9734ae948e19d4816dcc8a8f73ce6..397a60d140e52d0a9e0e6a67ca57025793ba9cbf 100644
--- a/src/client/inc/tscLocalMerge.h
+++ b/src/client/inc/tscLocalMerge.h
@@ -20,8 +20,8 @@
extern "C" {
#endif
-#include "qextbuffer.h"
-#include "qfill.h"
+#include "qExtbuffer.h"
+#include "qFill.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tsclient.h"
diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h
index 786133a8f35ff582d2add90523d5760a20b6398c..590f205e1da87d3bc434f374b6c173905a46c184 100644
--- a/src/client/inc/tscUtil.h
+++ b/src/client/inc/tscUtil.h
@@ -23,11 +23,11 @@ extern "C" {
/*
* @date 2018/09/30
*/
-#include "os.h"
-#include "tbuffer.h"
#include "exception.h"
-#include "qextbuffer.h"
+#include "os.h"
+#include "qExtbuffer.h"
#include "taosdef.h"
+#include "tbuffer.h"
#include "tscLocalMerge.h"
#include "tsclient.h"
diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h
index b5455ed1fb3c1c81a662376b0f8308a1a64c46ba..17840df4a4062002b54f4aea5293427a5a7a1c3b 100644
--- a/src/client/inc/tsclient.h
+++ b/src/client/inc/tsclient.h
@@ -31,8 +31,8 @@ extern "C" {
#include "tutil.h"
#include "qExecutor.h"
+#include "qTsbuf.h"
#include "qsqlparser.h"
-#include "qtsbuf.h"
#include "tcmdtype.h"
// forward declaration
diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c
index 262b7ab3f6fcc1d1b931371fbad4a860e3b231b5..72ccd5adc6ee835e9e25f9b9b8b2511cad797910 100644
--- a/src/client/src/tscFunctionImpl.c
+++ b/src/client/src/tscFunctionImpl.c
@@ -14,15 +14,15 @@
*/
#include "os.h"
-#include "qextbuffer.h"
-#include "qfill.h"
-#include "qhistogram.h"
-#include "qpercentile.h"
-#include "qsyntaxtreefunction.h"
-#include "qtsbuf.h"
+#include "qAst.h"
+#include "qExtbuffer.h"
+#include "qFill.h"
+#include "qHistogram.h"
+#include "qPercentile.h"
+#include "qSyntaxtreefunction.h"
+#include "qTsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
-#include "qast.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscompression.h"
@@ -74,7 +74,7 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
-void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); }
+void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
typedef struct tValuePair {
tVariant v;
@@ -330,12 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
-/**
- * the numOfRes should be kept, since it may be used later
- * and allow the ResultInfo to be re initialized
- */
-void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
-
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) {
assert(pResInfo->interResultBuf == NULL);
diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c
index 83700ce0a573ccf15a474a58ec4ebfda2634e2fd..7f336daa91ea621db3c10ccabd2f93c5a76e0fd1 100644
--- a/src/client/src/tscLocal.c
+++ b/src/client/src/tscLocal.c
@@ -16,14 +16,14 @@
#include "os.h"
#include "taosmsg.h"
-#include "tcache.h"
-#include "tscUtil.h"
-#include "tsclient.h"
+#include "qExtbuffer.h"
#include "taosdef.h"
+#include "tcache.h"
+#include "tname.h"
#include "tscLog.h"
-#include "qextbuffer.h"
+#include "tscUtil.h"
#include "tschemautil.h"
-#include "tname.h"
+#include "tsclient.h"
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index b97e48644991686c2f2aa6400c2e2b5898702878..09db65a1051abec0c2781169a5605047b550ec8b 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -18,9 +18,9 @@
#define _DEFAULT_SOURCE
#include "os.h"
+#include "qAst.h"
#include "taos.h"
#include "taosmsg.h"
-#include "qast.h"
#include "tcompare.h"
#include "tname.h"
#include "tscLog.h"
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index d9922b8718b832de9f37073f27d44a9502e5da6c..9a1fa7762964ccee253419f61ccf853423011e46 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -294,52 +294,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
}
- if (rpcMsg->pCont == NULL) {
- rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
- } else {
- STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
- // if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
- // if (pCmd->command == TSDB_SQL_CONNECT) {
- // rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
- // rpcFreeCont(rpcMsg->pCont);
- // return;
- // }
-
- // if (pCmd->command == TSDB_SQL_HB) {
- // rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
- // rpcFreeCont(rpcMsg->pCont);
- // return;
- // }
-
- // if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
- // pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW ||
- // pCmd->command == TSDB_SQL_RETRIEVE) {
- // // get table meta/vgroup query will not retry, do nothing
- // }
- // }
-
- if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT ||
- pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) &&
- (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
- rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
- tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
- // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
- if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
- pSql->cmd.submitSchema = 1;
- }
+ STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
- pSql->res.code = rpcMsg->code; // keep the previous error code
- if (pSql->retry > pSql->maxRetry) {
- tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
- } else {
- rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
-
- // if there is an error occurring, proceed to the following error handling procedure.
- // todo add test cases
- if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
- rpcFreeCont(rpcMsg->pCont);
- return;
- }
+ int32_t cmd = pCmd->command;
+ if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
+ (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
+ rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
+ rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
+ rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
+ tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
+
+ // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
+ if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
+ pSql->cmd.submitSchema = 1;
+ }
+
+ pSql->res.code = rpcMsg->code; // keep the previous error code
+ if (pSql->retry > pSql->maxRetry) {
+ tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
+ } else {
+ rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
+
+ // if there is an error occurring, proceed to the following error handling procedure.
+ if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
+ rpcFreeCont(rpcMsg->pCont);
+ return;
}
}
}
diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c
index d97bd413af40bf94acb4d3b712e1eb9075c6caf1..ff050dbbbf79a12508691e07195bc2ea6b55437b 100644
--- a/src/client/src/tscSql.c
+++ b/src/client/src/tscSql.c
@@ -15,7 +15,7 @@
#include "hash.h"
#include "os.h"
-#include "qast.h"
+#include "qAst.h"
#include "tcache.h"
#include "tnote.h"
#include "trpc.h"
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index 1dbc52efb0868042b28f4e358c7af0df878e47e8..5d26d09fae1dcc6c947375004703a9e6621ec629 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -14,8 +14,8 @@
*/
#include "os.h"
-#include "qtsbuf.h"
-#include "qast.h"
+#include "qAst.h"
+#include "qTsbuf.h"
#include "tcompare.h"
#include "tscLog.h"
#include "tscSubquery.h"
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index 957bdeeb7f0e329ca1cad49e17daf031d0850ee6..17adc0c03d804280c43df0326a955cd29fd1aa60 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -13,11 +13,11 @@
* along with this program. If not, see .
*/
-#include "os.h"
-#include "hash.h"
#include "tscUtil.h"
+#include "hash.h"
+#include "os.h"
+#include "qAst.h"
#include "taosmsg.h"
-#include "qast.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
diff --git a/src/query/inc/qast.h b/src/query/inc/qAst.h
similarity index 98%
rename from src/query/inc/qast.h
rename to src/query/inc/qAst.h
index 918604f8c9b6122f255ace647119040e288956a4..00049b486d765f341a97ce45250ec764f67502db 100644
--- a/src/query/inc/qast.h
+++ b/src/query/inc/qAst.h
@@ -45,7 +45,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
*
*/
typedef struct tQueryInfo {
- int32_t colIndex; // index of column in schema
uint8_t optr; // expression operator
SSchema sch; // schema of tags
char* q;
diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h
index fb8750323f9a8f868516ef43b6d3ff96d08c646c..127c38a6f836196641ec1179da680d872e0eac8e 100644
--- a/src/query/inc/qExecutor.h
+++ b/src/query/inc/qExecutor.h
@@ -18,16 +18,16 @@
#include "os.h"
#include "hash.h"
-#include "qfill.h"
-#include "qresultBuf.h"
+#include "qFill.h"
+#include "qResultbuf.h"
+#include "qTsbuf.h"
#include "qsqlparser.h"
-#include "qtsbuf.h"
+#include "query.h"
#include "taosdef.h"
#include "tarray.h"
#include "tlockfree.h"
#include "tsdb.h"
#include "tsqlfunction.h"
-#include "query.h"
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
@@ -158,7 +158,7 @@ typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
SQLFunctionCtx* pCtx;
- int16_t numOfRowsPerPage;
+ int32_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo;
diff --git a/src/query/inc/qextbuffer.h b/src/query/inc/qExtbuffer.h
similarity index 98%
rename from src/query/inc/qextbuffer.h
rename to src/query/inc/qExtbuffer.h
index 2cbef2b1bea66654b262a3fb37061477969960f2..b57c48933f0878ca86384219b1c7f66446db28eb 100644
--- a/src/query/inc/qextbuffer.h
+++ b/src/query/inc/qExtbuffer.h
@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
-#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo
+#define DEFAULT_PAGE_SIZE (1024L*4) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
diff --git a/src/query/inc/qfill.h b/src/query/inc/qFill.h
similarity index 99%
rename from src/query/inc/qfill.h
rename to src/query/inc/qFill.h
index ee5974708a1f16161456c582caabd3eceeb6e57e..db6a69c2c5a98b4b6716d4b3ae3890adacb6c2a3 100644
--- a/src/query/inc/qfill.h
+++ b/src/query/inc/qFill.h
@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "os.h"
+#include "qExtbuffer.h"
#include "taosdef.h"
-#include "qextbuffer.h"
typedef struct {
STColumn col; // column info
diff --git a/src/query/inc/qhistogram.h b/src/query/inc/qHistogram.h
similarity index 100%
rename from src/query/inc/qhistogram.h
rename to src/query/inc/qHistogram.h
diff --git a/src/query/inc/qpercentile.h b/src/query/inc/qPercentile.h
similarity index 98%
rename from src/query/inc/qpercentile.h
rename to src/query/inc/qPercentile.h
index c1227dad77d278d9534df229b8a68c2b4f305a54..52f666c338aae175aceb117c2919c2229c2e0cfa 100644
--- a/src/query/inc/qpercentile.h
+++ b/src/query/inc/qPercentile.h
@@ -16,7 +16,7 @@
#ifndef TDENGINE_QPERCENTILE_H
#define TDENGINE_QPERCENTILE_H
-#include "qextbuffer.h"
+#include "qExtbuffer.h"
typedef struct MinMaxEntry {
union {
diff --git a/src/query/inc/qresultBuf.h b/src/query/inc/qResultbuf.h
similarity index 74%
rename from src/query/inc/qresultBuf.h
rename to src/query/inc/qResultbuf.h
index a323d530a27ada5eabf9c4316281ee01d782c090..8c8afb0957c862042e1da99e211ed06e091dee4a 100644
--- a/src/query/inc/qresultBuf.h
+++ b/src/query/inc/qResultbuf.h
@@ -20,9 +20,9 @@
extern "C" {
#endif
-#include "os.h"
-#include "qextbuffer.h"
#include "hash.h"
+#include "os.h"
+#include "qExtbuffer.h"
typedef struct SArray* SIDList;
@@ -33,14 +33,20 @@ typedef struct SDiskbasedResultBuf {
int32_t fd; // data file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
- char* pBuf; // mmap buffer pointer
+ void* pBuf; // mmap buffer pointer
char* path; // file path
-
+ int32_t pageSize; // current used page size
+ int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list
+
+ void* iBuf; // inmemory buf
+ void* handle; // for debug purpose
+ void* emptyDummyIdList; // dummy id list
} SDiskbasedResultBuf;
-#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5)
+#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
+#define DEFAULT_INMEM_BUF_PAGES 10
/**
* create disk-based result buffer
@@ -49,7 +55,8 @@ typedef struct SDiskbasedResultBuf {
* @param rowSize
* @return
*/
-int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle);
+int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
+ int32_t inMemPages, void* handle);
/**
*
@@ -81,8 +88,13 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
-#define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id)))
-
+static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
+ if (id < pResultBuf->inMemPages) {
+ return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
+ } else {
+ return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize);
+ }
+}
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
diff --git a/src/query/inc/qsyntaxtreefunction.h b/src/query/inc/qSyntaxtreefunction.h
similarity index 100%
rename from src/query/inc/qsyntaxtreefunction.h
rename to src/query/inc/qSyntaxtreefunction.h
diff --git a/src/query/inc/qtsbuf.h b/src/query/inc/qTsbuf.h
similarity index 100%
rename from src/query/inc/qtsbuf.h
rename to src/query/inc/qTsbuf.h
diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h
index 78ae7be03046cbe2636a8c456708b2b5ff4fc089..7119cb75fe9f6604e42a96406ff631840477d5f7 100644
--- a/src/query/inc/qUtil.h
+++ b/src/query/inc/qUtil.h
@@ -49,7 +49,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery;
- tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
+ tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
@@ -59,6 +59,4 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
__filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type);
-bool supportPrefilter(int32_t type);
-
#endif // TDENGINE_QUERYUTIL_H
diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h
index c687f01cbcfa0a0b81209afcf71ec44f8c8c43af..5ce9121cf1c4f5e88f3b68823431ad61923e70ec 100644
--- a/src/query/inc/tsqlfunction.h
+++ b/src/query/inc/tsqlfunction.h
@@ -255,7 +255,15 @@ extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
-void resetResultInfo(SResultInfo *pResInfo);
+/**
+ * the numOfRes should be kept, since it may be used later
+ * and allow the ResultInfo to be re initialized
+ */
+#define RESET_RESULT_INFO(_r) \
+ do { \
+ (_r)->initialized = false; \
+ } while (0)
+
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
diff --git a/src/query/src/qast.c b/src/query/src/qAst.c
similarity index 99%
rename from src/query/src/qast.c
rename to src/query/src/qAst.c
index da4eb8f3baf67639615ec86b1f0b7b44269219e6..c2578c15c0536ea222f376e833a8fe11fa269229 100644
--- a/src/query/src/qast.c
+++ b/src/query/src/qAst.c
@@ -16,17 +16,17 @@
#include "os.h"
-#include "tname.h"
-#include "qast.h"
-#include "tsdb.h"
#include "exception.h"
+#include "qAst.h"
+#include "qSyntaxtreefunction.h"
#include "qsqlparser.h"
-#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
+#include "tname.h"
+#include "tsdb.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstoken.h"
@@ -678,7 +678,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
// todo speed up by using hash
- if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
+ if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
@@ -716,7 +716,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
}
tQueryInfo *pQueryInfo = pExpr->_node.info;
- if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
+ if (pQueryInfo->sch.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index fc859854b578da2b47bd44d8db8d1307accb70ea..906d0cfe678e89d2fb357bf3572f7e61ade70cb2 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -13,19 +13,19 @@
* along with this program. If not, see .
*/
#include "os.h"
+#include "qFill.h"
+#include "taosmsg.h"
#include "tcache.h"
#include "tglobal.h"
-#include "qfill.h"
-#include "taosmsg.h"
#include "exception.h"
#include "hash.h"
+#include "qAst.h"
#include "qExecutor.h"
+#include "qResultbuf.h"
#include "qUtil.h"
-#include "qresultBuf.h"
#include "query.h"
#include "queryLog.h"
-#include "qast.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "ttime.h"
@@ -50,11 +50,6 @@
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
-/* get the qinfo struct address from the query struct address */
-#define GET_COLUMN_BYTES(query, colidx) \
- ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
-#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
-
enum {
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u,
@@ -238,9 +233,7 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) {
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_NORMAL) {
- /*
- * make sure the normal column locates at the second position if tbname exists in group by clause
- */
+ //make sure the normal column locates at the second position if tbname exists in group by clause
if (pGroupbyExpr->numOfGroupCols > 1) {
assert(pColIndex->colIndex > 0);
}
@@ -299,6 +292,17 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
return false;
}
+bool isProjQuery(SQuery *pQuery) {
+ for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
+ int32_t functId = pQuery->pSelectExpr[i].base.functionId;
+ if (functId != TSDB_FUNC_PRJ && functId != TSDB_FUNC_TAGPRJ) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) {
@@ -394,15 +398,15 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
int64_t newCap = pWindowResInfo->capacity * 1.5;
char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult));
- if (t != NULL) {
- pWindowResInfo->pResult = (SWindowResult *)t;
-
- int32_t inc = newCap - pWindowResInfo->capacity;
- memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc);
- } else {
- // todo
+ if (t == NULL) {
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
+ pWindowResInfo->pResult = (SWindowResult *)t;
+
+ int32_t inc = newCap - pWindowResInfo->capacity;
+ memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc);
+
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
}
@@ -475,7 +479,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
pData = getNewDataBuf(pResultBuf, sid, &pageId);
} else {
pageId = getLastPageId(list);
- pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, pageId);
+ pData = getResBufPage(pResultBuf, pageId);
if (pData->num >= numOfRowsPerPage) {
pData = getNewDataBuf(pResultBuf, sid, &pageId);
@@ -1008,7 +1012,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break;
}
-// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true);
if (pWindowRes == NULL) {
return -1;
@@ -1053,9 +1056,9 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
*type = pQuery->colList[colIndex].type;
*bytes = pQuery->colList[colIndex].bytes;
/*
- * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare
- * stage, the remain meter may not have the required column in cache actually. So, the validation of required
- * column in cache with the corresponding meter schema is reinforced.
+ * the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare
+ * stage, the remain tables may not have the required column in cache actually. So, the validation of required
+ * column in cache with the corresponding schema is reinforced.
*/
int32_t numOfCols = taosArrayGetSize(pDataBlock);
@@ -1206,9 +1209,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
continue;
}
- // interval window query
+ // interval window query, decide the time window according to the primary timestamp
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
- // decide the time window according to the primary timestamp
int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
@@ -1230,8 +1232,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
while (1) {
GET_NEXT_TIMEWINDOW(pQuery, &nextWin);
- if (/*pWindowResInfo->startTime > nextWin.skey ||*/
- (nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
+ if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
break;
}
@@ -1489,6 +1490,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
goto _clean;
}
+ qDebug("QInfo:%p setup runtime env1", GET_QINFO_ADDR(pRuntimeEnv));
+
pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
@@ -1533,6 +1536,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
+ qDebug("QInfo:%p setup runtime env2", GET_QINFO_ADDR(pRuntimeEnv));
+
// set the order information for top/bottom query
int32_t functionId = pCtx->functionId;
@@ -1553,17 +1558,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
+ qDebug("QInfo:%p setup runtime env3", GET_QINFO_ADDR(pRuntimeEnv));
+
char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultInfo) * pQuery->numOfOutput;
// set the intermediate result output buffer
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
+ qDebug("QInfo:%p setup runtime env4", GET_QINFO_ADDR(pRuntimeEnv));
+
// if it is group by normal column, do not set output buffer, the output buffer is pResult
- if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !pRuntimeEnv->stableQuery) {
+ if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery) {
resetCtxOutputBuf(pRuntimeEnv);
}
+ qDebug("QInfo:%p setup runtime env5", GET_QINFO_ADDR(pRuntimeEnv));
+
setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx);
+
+ qDebug("QInfo:%p init completed", GET_QINFO_ADDR(pRuntimeEnv));
return TSDB_CODE_SUCCESS;
_clean:
@@ -1915,9 +1928,20 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
return num;
}
-static FORCE_INLINE int32_t getNumOfRowsInResultPage(SQuery *pQuery, bool topBotQuery, bool isSTableQuery) {
- int32_t rowSize = pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, topBotQuery, isSTableQuery);
- return (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
+static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
+ SQuery* pQuery = pRuntimeEnv->pQuery;
+
+ *rowsize = pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
+ int32_t overhead = sizeof(tFilePage);
+
+ // one page contains at least two rows
+ *ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
+ while(((*rowsize) * 2) > (*ps) - overhead) {
+ *ps = (*ps << 1u);
+ }
+
+ pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize);
+
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
@@ -2043,8 +2067,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
- // If current data block is contained by all possible time window, loading current
- // data block is not needed.
+ // If current data block is contained by all possible time window, do not load current data block.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
status = BLK_DATA_ALL_NEEDED;
}
@@ -2364,6 +2387,18 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
}
}
+static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) {
+ assert(pTagColList != NULL && numOfTags > 0);
+
+ for(int32_t i = 0; i < numOfTags; ++i) {
+ if (pTagColList[i].colId == colId) {
+ return &pTagColList[i];
+ }
+ }
+
+ return NULL;
+}
+
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
@@ -2372,16 +2407,10 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
assert(pExprInfo->base.numOfParams == 1);
- // todo refactor extract function.
- int16_t type = -1, bytes = -1;
- for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
- if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
- type = pQuery->tagColList[i].type;
- bytes = pQuery->tagColList[i].bytes;
- }
- }
+ int16_t tagColId = pExprInfo->base.arg->argValue.i64;
+ SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
- doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
+ doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
@@ -2399,20 +2428,14 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
// set the join tag for first column
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
- if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
- pRuntimeEnv->pTSBuf != NULL) {
+ if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTSBuf != NULL &&
+ pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
- // todo refactor
- int16_t type = -1, bytes = -1;
- for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
- if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
- type = pQuery->tagColList[i].type;
- bytes = pQuery->tagColList[i].bytes;
- }
- }
+ int16_t tagColId = pExprInfo->base.arg->argValue.i64;
+ SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
- doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
+ doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64,
pRuntimeEnv->pCtx[0].tag.i64Key)
}
@@ -2429,7 +2452,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE;
- resetResultInfo(pCtx[i].resultInfo);
+ RESET_RESULT_INFO(pCtx[i].resultInfo);
aAggs[functionId].init(&pCtx[i]);
}
@@ -2666,7 +2689,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t size = taosArrayGetSize(list);
for (int32_t i = 0; i < size; ++i) {
int32_t* pgId = taosArrayGet(list, i);
- tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId);
+ tFilePage *pData = getResBufPage(pResultBuf, *pgId);
total += pData->num;
}
@@ -2675,7 +2698,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t offset = 0;
for (int32_t j = 0; j < size; ++j) {
int32_t* pgId = taosArrayGet(list, j);
- tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId);
+ tFilePage *pData = getResBufPage(pResultBuf, *pgId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
@@ -2865,10 +2888,10 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
- int32_t capacity = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t pageId = -1;
+ int32_t capacity = pResultBuf->numOfRowsPerPage;
int32_t remain = pQuery->sdata[0]->num;
int32_t offset = 0;
@@ -3038,7 +3061,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
- resetResultInfo(&pRuntimeEnv->resultInfo[i]);
+ RESET_RESULT_INFO(&pRuntimeEnv->resultInfo[i]);
pCtx->resultInfo = &pRuntimeEnv->resultInfo[i];
// set the timestamp output buffer for top/bottom/diff query
@@ -3077,7 +3100,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output;
}
- resetResultInfo(pRuntimeEnv->pCtx[j].resultInfo);
+ RESET_RESULT_INFO(pRuntimeEnv->pCtx[j].resultInfo);
}
}
@@ -3331,8 +3354,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
- pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
+ pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN;
@@ -3467,7 +3490,6 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return;
}
- int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true);
if (pWindowRes == NULL) {
@@ -3479,7 +3501,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pWindowRes->pos.pageId == -1) {
- if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage) !=
+ if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
TSDB_CODE_SUCCESS) {
return;
}
@@ -4149,6 +4171,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
}
+
return terrno;
}
@@ -4174,10 +4197,9 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
}
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
- int32_t code = TSDB_CODE_SUCCESS;
-
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
+ int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->precision = tsdbGetCfg(tsdb)->precision;
@@ -4186,6 +4208,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQInfo, false);
+
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -4212,33 +4235,42 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
return code;
}
- pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->topBotQuery, isSTableQuery);
+ int32_t ps = DEFAULT_PAGE_SIZE;
+ int32_t rowsize = 0;
+ getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
- int32_t rows = getInitialPageNum(pQInfo);
- code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
+ int32_t numOfPages = getInitialPageNum(pQInfo);
+ code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfPages, rowsize, ps, numOfPages, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
int16_t type = TSDB_DATA_TYPE_NULL;
+ int32_t threshold = 0;
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
+ threshold = 4000;
} else {
type = TSDB_DATA_TYPE_INT; // group id
+ threshold = GET_NUM_OF_TABLEGROUP(pQInfo);
+ if (threshold < 8) {
+ threshold = 8;
+ }
}
- code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type);
+ code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 8, threshold, type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
-
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
- int32_t rows = getInitialPageNum(pQInfo);
- code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
+ int32_t numOfResultRows = getInitialPageNum(pQInfo);
+ getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
+
+ code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfResultRows, rowsize, ps, numOfResultRows, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -4250,7 +4282,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_TIMESTAMP;
}
- code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
+ code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, numOfResultRows, 4096, type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@@ -5693,7 +5725,6 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
-
static int compareTableIdInfo(const void* a, const void* b) {
const STableIdInfo* x = (const STableIdInfo*)a;
const STableIdInfo* y = (const STableIdInfo*)b;
@@ -5709,13 +5740,18 @@ static void calResultBufSize(SQuery* pQuery) {
const int32_t RESULT_MSG_MIN_ROWS = 8192;
const float RESULT_THRESHOLD_RATIO = 0.85;
- int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize;
- if (numOfRes < RESULT_MSG_MIN_ROWS) {
- numOfRes = RESULT_MSG_MIN_ROWS;
- }
+ if (isProjQuery(pQuery)) {
+ int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize;
+ if (numOfRes < RESULT_MSG_MIN_ROWS) {
+ numOfRes = RESULT_MSG_MIN_ROWS;
+ }
- pQuery->rec.capacity = numOfRes;
- pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO;
+ pQuery->rec.capacity = numOfRes;
+ pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO;
+ } else { // in case of non-prj query, a smaller output buffer will be used.
+ pQuery->rec.capacity = 4096;
+ pQuery->rec.threshold = pQuery->rec.capacity * RESULT_THRESHOLD_RATIO;
+ }
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
@@ -5727,6 +5763,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
if (pQInfo == NULL) {
goto _cleanup_qinfo;
}
+
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->tableGroupInfo = *pTableGroupInfo;
@@ -5926,6 +5963,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
pQuery->window.ekey, pQuery->order.order);
setQueryStatus(pQuery, QUERY_COMPLETED);
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
+
sem_post(&pQInfo->dataReady);
return TSDB_CODE_SUCCESS;
}
@@ -6044,11 +6082,10 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata);
tfree(pQuery);
+ pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo);
- // destroy signature, in order to avoid the query process pass the object safety check
- memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo);
}
@@ -6136,16 +6173,17 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
int32_t code = TSDB_CODE_SUCCESS;
- char * tagCond = NULL, *tbnameCond = NULL;
- SArray * pTableIdList = NULL;
- SSqlFuncMsg **pExprMsg = NULL;
- SColIndex * pGroupColIndex = NULL;
- SColumnInfo* pTagColumnInfo = NULL;
- SExprInfo *pExprs = NULL;
- SSqlGroupbyExpr *pGroupbyExpr = NULL;
+ char *tagCond = NULL;
+ char *tbnameCond = NULL;
+ SArray *pTableIdList = NULL;
+ SSqlFuncMsg **pExprMsg = NULL;
+ SExprInfo *pExprs = NULL;
+ SColIndex *pGroupColIndex = NULL;
+ SColumnInfo *pTagColumnInfo = NULL;
+ SSqlGroupbyExpr *pGroupbyExpr = NULL;
- if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) !=
- TSDB_CODE_SUCCESS) {
+ code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo);
+ if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
@@ -6172,7 +6210,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
-
+ int64_t st = taosGetTimestampUs();
+
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(pTableIdList, 0);
@@ -6182,7 +6221,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
- // TODO: need a macro from TSDB to check if table is super table
// also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
@@ -6193,11 +6231,12 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
-
+
+ qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex,
numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
- qError("qmsg:%p failed to QueryStable, reason: %s", pQueryMsg, tstrerror(code));
+ qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
goto _over;
}
} else {
@@ -6208,6 +6247,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
qDebug("qmsg:%p query on %zu tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
+
+ int64_t el = taosGetTimestampUs() - st;
+ qDebug("qmsg:%p tag filter completed, numOfTables:%zu, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
}
@@ -6247,7 +6289,7 @@ _over:
*pQInfo = NULL;
}
- // if failed to add ref for all meters in this query, abort current query
+ // if failed to add ref for all tables in this query, abort current query
return code;
}
diff --git a/src/query/src/qextbuffer.c b/src/query/src/qExtbuffer.c
similarity index 99%
rename from src/query/src/qextbuffer.c
rename to src/query/src/qExtbuffer.c
index afcf90212366cea331ba100d4c4a8974102d5fc5..69c5f0e24fe6361d41953c35fce1380b97d4e752 100644
--- a/src/query/src/qextbuffer.c
+++ b/src/query/src/qExtbuffer.c
@@ -12,16 +12,15 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#include "qExtbuffer.h"
#include "os.h"
-#include "tulog.h"
-#include "qextbuffer.h"
+#include "queryLog.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
-#include "ttime.h"
+#include "tulog.h"
#include "tutil.h"
-#include "queryLog.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
diff --git a/src/query/src/qfill.c b/src/query/src/qFill.c
similarity index 99%
rename from src/query/src/qfill.c
rename to src/query/src/qFill.c
index 65951a5b9e22786525d6b9f61672853f3be3d93d..9dec2598bc1bcb3a143ae4906718a741624f392a 100644
--- a/src/query/src/qfill.c
+++ b/src/query/src/qFill.c
@@ -13,9 +13,9 @@
* along with this program. If not, see .
*/
+#include "qFill.h"
#include "os.h"
-#include "qfill.h"
-#include "qextbuffer.h"
+#include "qExtbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
diff --git a/src/query/src/qFilterFunc.c b/src/query/src/qFilterfunc.c
similarity index 99%
rename from src/query/src/qFilterFunc.c
rename to src/query/src/qFilterfunc.c
index 1a95b9fe2122418947c2f6d1b856ed7ca318a5e6..7e9f5c7da5cf6e7e09ac96d1878bc109fc4812ab 100644
--- a/src/query/src/qFilterFunc.c
+++ b/src/query/src/qFilterfunc.c
@@ -554,5 +554,3 @@ __filter_func_t* getValueFilterFuncArray(int32_t type) {
default: return NULL;
}
}
-
-bool supportPrefilter(int32_t type) { return type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR; }
diff --git a/src/query/src/qhistogram.c b/src/query/src/qHistogram.c
similarity index 99%
rename from src/query/src/qhistogram.c
rename to src/query/src/qHistogram.c
index 26482e9f142728f0b097511f98ca2e297e34ef7b..7835d824699c46b2daa4a805ae163b5240f2c4e0 100644
--- a/src/query/src/qhistogram.c
+++ b/src/query/src/qHistogram.c
@@ -14,7 +14,7 @@
*/
#include "os.h"
-#include "qhistogram.h"
+#include "qHistogram.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tlosertree.h"
diff --git a/src/query/src/qparserImpl.c b/src/query/src/qParserImpl.c
similarity index 100%
rename from src/query/src/qparserImpl.c
rename to src/query/src/qParserImpl.c
diff --git a/src/query/src/qpercentile.c b/src/query/src/qPercentile.c
similarity index 99%
rename from src/query/src/qpercentile.c
rename to src/query/src/qPercentile.c
index dc5ecb796a6eb6e4b8be04aaa35618ae2e5ab532..c4490a01e79408ec2bf09527049082f9c05566c8 100644
--- a/src/query/src/qpercentile.c
+++ b/src/query/src/qPercentile.c
@@ -13,12 +13,12 @@
* along with this program. If not, see .
*/
+#include "qPercentile.h"
#include "os.h"
-#include "tulog.h"
-#include "qpercentile.h"
+#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
-#include "queryLog.h"
+#include "tulog.h"
tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) {
tExtMemBuffer *pBuffer = NULL;
diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c
new file mode 100644
index 0000000000000000000000000000000000000000..de59676e59679dca4feb3540e3f97981f98f10ba
--- /dev/null
+++ b/src/query/src/qResultbuf.c
@@ -0,0 +1,221 @@
+#include "qResultbuf.h"
+#include "hash.h"
+#include "qExtbuffer.h"
+#include "queryLog.h"
+#include "taoserror.h"
+
+int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
+ int32_t pagesize, int32_t inMemPages, void* handle) {
+
+ *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
+ SDiskbasedResultBuf* pResBuf = *pResultBuf;
+ if (pResBuf == NULL) {
+ return TSDB_CODE_COM_OUT_OF_MEMORY;
+ }
+
+ pResBuf->pageSize = pagesize;
+ pResBuf->numOfPages = inMemPages; // all pages are in buffer in the first place
+ pResBuf->inMemPages = inMemPages;
+ assert(inMemPages <= numOfPages);
+
+ pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
+
+ pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
+ pResBuf->incStep = 4;
+ pResBuf->allocateId = -1;
+
+ pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
+
+ // init id hash table
+ pResBuf->idsTable = taosHashInit(numOfPages, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
+ pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
+
+ char path[PATH_MAX] = {0};
+ getTmpfilePath("tsdb_qbuf", path);
+ pResBuf->path = strdup(path);
+
+ pResBuf->fd = FD_INITIALIZER;
+ pResBuf->pBuf = NULL;
+ pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
+
+ qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
+ pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
+
+int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
+
+#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
+#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
+
+static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
+ pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
+ if (!FD_VALID(pResultBuf->fd)) {
+ qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
+ pResultBuf->numOfPages += pResultBuf->incStep;
+
+ int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
+ if (ret != TSDB_CODE_SUCCESS) {
+ qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
+ if (pResultBuf->pBuf == MAP_FAILED) {
+ qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNumOfPages) {
+ assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize);
+ int32_t ret = TSDB_CODE_SUCCESS;
+
+ if (pResultBuf->pBuf == NULL) {
+ assert(pResultBuf->fd == FD_INITIALIZER);
+
+ if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
+ return ret;
+ }
+ } else {
+ ret = munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
+ pResultBuf->numOfPages += incNumOfPages;
+
+ /*
+ * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
+ * be insufficient
+ */
+ ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
+ if (ret != TSDB_CODE_SUCCESS) {
+ // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
+ // strerror(errno));
+ return TSDB_CODE_QRY_NO_DISKSPACE;
+ }
+
+ pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
+ pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
+
+ if (pResultBuf->pBuf == MAP_FAILED) {
+ // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+#define NO_AVAILABLE_PAGES(_b) ((_b)->allocateId == (_b)->numOfPages - 1)
+
+static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
+ assert(pResultBuf != NULL);
+
+ char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
+ if (p == NULL) { // it is a new group id
+ return -1;
+ }
+
+ int32_t slot = GET_INT32_VAL(p);
+ assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
+
+ return slot;
+}
+
+static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
+ int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
+ taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
+
+ SArray* pa = taosArrayInit(1, sizeof(int32_t));
+ taosArrayPush(pResultBuf->list, &pa);
+
+ assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
+ return num;
+}
+
+static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
+ int32_t slot = getGroupIndex(pResultBuf, groupId);
+ if (slot < 0) {
+ slot = addNewGroupId(pResultBuf, groupId);
+ }
+
+ SIDList pList = taosArrayGetP(pResultBuf->list, slot);
+ taosArrayPush(pList, &pageId);
+}
+
+tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
+ if (NO_AVAILABLE_PAGES(pResultBuf)) {
+ if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
+ return NULL;
+ }
+ }
+
+ // register new id in this group
+ *pageId = (++pResultBuf->allocateId);
+ registerPageId(pResultBuf, groupId, *pageId);
+
+ // clear memory for the new page
+ tFilePage* page = getResBufPage(pResultBuf, *pageId);
+ memset(page, 0, pResultBuf->pageSize);
+
+ return page;
+}
+
+int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
+
+SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
+ int32_t slot = getGroupIndex(pResultBuf, groupId);
+ if (slot < 0) {
+ return pResultBuf->emptyDummyIdList;
+ } else {
+ return taosArrayGetP(pResultBuf->list, slot);
+ }
+}
+
+void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
+ if (pResultBuf == NULL) {
+ return;
+ }
+
+ if (FD_VALID(pResultBuf->fd)) {
+ qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
+ pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
+
+ close(pResultBuf->fd);
+ munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
+ pResultBuf->pBuf = NULL;
+ } else {
+ qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
+ pResultBuf->totalBufSize);
+ }
+
+ unlink(pResultBuf->path);
+ tfree(pResultBuf->path);
+
+ size_t size = taosArrayGetSize(pResultBuf->list);
+ for (int32_t i = 0; i < size; ++i) {
+ SArray* pa = taosArrayGetP(pResultBuf->list, i);
+ taosArrayDestroy(pa);
+ }
+
+ taosArrayDestroy(pResultBuf->list);
+ taosArrayDestroy(pResultBuf->emptyDummyIdList);
+ taosHashCleanup(pResultBuf->idsTable);
+
+ tfree(pResultBuf->iBuf);
+ tfree(pResultBuf);
+}
+
+int32_t getLastPageId(SIDList pList) {
+ size_t size = taosArrayGetSize(pList);
+ return *(int32_t*) taosArrayGet(pList, size - 1);
+}
+
diff --git a/src/query/src/qsyntaxtreefunction.c b/src/query/src/qSyntaxtreefunction.c
similarity index 99%
rename from src/query/src/qsyntaxtreefunction.c
rename to src/query/src/qSyntaxtreefunction.c
index 5719bb0188d6739d327112631c4a0fc82335fb30..2104edfd910bba1a1701800387545c4f58dfb625 100644
--- a/src/query/src/qsyntaxtreefunction.c
+++ b/src/query/src/qSyntaxtreefunction.c
@@ -15,7 +15,7 @@
#include "os.h"
-#include "qsyntaxtreefunction.h"
+#include "qSyntaxtreefunction.h"
#include "taosdef.h"
#include "tutil.h"
diff --git a/src/query/src/qtokenizer.c b/src/query/src/qTokenizer.c
similarity index 100%
rename from src/query/src/qtokenizer.c
rename to src/query/src/qTokenizer.c
diff --git a/src/query/src/qtsbuf.c b/src/query/src/qTsbuf.c
similarity index 99%
rename from src/query/src/qtsbuf.c
rename to src/query/src/qTsbuf.c
index b84fbded38d8ada4d4f426201ea4f747a062e37e..20b29107f5ebaffb6807223f6e9763496ca668aa 100644
--- a/src/query/src/qtsbuf.c
+++ b/src/query/src/qTsbuf.c
@@ -1,7 +1,7 @@
-#include "qtsbuf.h"
+#include "qTsbuf.h"
+#include "taoserror.h"
#include "tscompression.h"
#include "tutil.h"
-#include "taoserror.h"
static int32_t getDataStartOffset();
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo);
diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c
index 4e71de830f31e4a517a78f86ddbcc71d21e2922c..be8447149331ecf3ede4c0873f6ced9c6c385022 100644
--- a/src/query/src/qUtil.c
+++ b/src/query/src/qUtil.c
@@ -26,12 +26,10 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
- assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
size += pQuery->pSelectExpr[i].interBytes;
}
assert(size > 0);
-
return size;
}
@@ -243,7 +241,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size);
- resetResultInfo(pResultInfo);
+ RESET_RESULT_INFO(pResultInfo);
}
pWindowRes->numOfRows = 0;
diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c
deleted file mode 100644
index ae1a95179bfd6478dfcb1109edf7d3f36aac97de..0000000000000000000000000000000000000000
--- a/src/query/src/qresultBuf.c
+++ /dev/null
@@ -1,184 +0,0 @@
-#include "qresultBuf.h"
-#include "hash.h"
-#include "qextbuffer.h"
-#include "taoserror.h"
-#include "queryLog.h"
-
-int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
- *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
- SDiskbasedResultBuf* pResBuf = *pResultBuf;
- if (pResBuf == NULL) {
- return TSDB_CODE_COM_OUT_OF_MEMORY;
- }
-
- pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
- pResBuf->numOfPages = size;
-
- pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
- pResBuf->incStep = 4;
-
- // init id hash table
- pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
- pResBuf->list = taosArrayInit(size, POINTER_BYTES);
-
- char path[4096] = {0};
- getTmpfilePath("tsdb_qbuf", path);
- pResBuf->path = strdup(path);
-
- pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666);
- if (!FD_VALID(pResBuf->fd)) {
- qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
-
- int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE);
- if (ret != TSDB_CODE_SUCCESS) {
- qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
-
- pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0);
- if (pResBuf->pBuf == MAP_FAILED) {
- qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno));
- return TAOS_SYSTEM_ERROR(errno);
- }
-
- qDebug("QInfo:%p create tmp file for output result:%s, %" PRId64 "bytes", handle, pResBuf->path,
- pResBuf->totalBufSize);
-
- return TSDB_CODE_SUCCESS;
-}
-
-int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
-
-int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
-
-static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) {
- assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE == pResultBuf->totalBufSize);
-
- int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
- pResultBuf->numOfPages += numOfPages;
-
- /*
- * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
- * be insufficient
- */
- ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE);
- if (ret != 0) {
- // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
- // strerror(errno));
- return TSDB_CODE_QRY_NO_DISKSPACE;
- }
-
- pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
- pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
-
- if (pResultBuf->pBuf == MAP_FAILED) {
- // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
- return TSDB_CODE_QRY_OUT_OF_MEMORY;
- }
-
- return TSDB_CODE_SUCCESS;
-}
-
-static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) {
- return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
-}
-
-static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
- assert(pResultBuf != NULL);
-
- char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
- if (p == NULL) { // it is a new group id
- return -1;
- }
-
- int32_t slot = GET_INT32_VAL(p);
- assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
-
- return slot;
-}
-
-static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
- int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
- taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
-
- SArray* pa = taosArrayInit(1, sizeof(int32_t));
- taosArrayPush(pResultBuf->list, &pa);
-
- assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
- return num;
-}
-
-static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
- int32_t slot = getGroupIndex(pResultBuf, groupId);
- if (slot < 0) {
- slot = addNewGroupId(pResultBuf, groupId);
- }
-
- SIDList pList = taosArrayGetP(pResultBuf->list, slot);
- taosArrayPush(pList, &pageId);
-}
-
-tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
- if (noMoreAvailablePages(pResultBuf)) {
- if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
- return NULL;
- }
- }
-
- // register new id in this group
- *pageId = (pResultBuf->allocateId++);
- registerPageId(pResultBuf, groupId, *pageId);
-
- tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
-
- // clear memory for the new page
- memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE);
-
- return page;
-}
-
-int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
-
-SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
- int32_t slot = getGroupIndex(pResultBuf, groupId);
- if (slot < 0) {
- return taosArrayInit(1, sizeof(int32_t));
- } else {
- return taosArrayGetP(pResultBuf->list, slot);
- }
-}
-
-void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
- if (pResultBuf == NULL) {
- return;
- }
-
- if (FD_VALID(pResultBuf->fd)) {
- close(pResultBuf->fd);
- }
-
- qDebug("QInfo:%p disk-based output buffer closed, %" PRId64 " bytes, file:%s", handle, pResultBuf->totalBufSize, pResultBuf->path);
- munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
- unlink(pResultBuf->path);
-
- tfree(pResultBuf->path);
-
- size_t size = taosArrayGetSize(pResultBuf->list);
- for (int32_t i = 0; i < size; ++i) {
- SArray* pa = taosArrayGetP(pResultBuf->list, i);
- taosArrayDestroy(pa);
- }
-
- taosArrayDestroy(pResultBuf->list);
- taosHashCleanup(pResultBuf->idsTable);
-
- tfree(pResultBuf);
-}
-
-int32_t getLastPageId(SIDList pList) {
- size_t size = taosArrayGetSize(pList);
- return *(int32_t*) taosArrayGet(pList, size - 1);
-}
-
diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp
index df2708721647410e0230a90e689e1c713c8c2592..99f03a7ff8185f8537ebb243e00bce77992db344 100644
--- a/src/query/tests/astTest.cpp
+++ b/src/query/tests/astTest.cpp
@@ -3,8 +3,8 @@
#include
#include
+#include "qAst.h"
#include "taosmsg.h"
-#include "qast.h"
#include "tsdb.h"
#include "tskiplist.h"
diff --git a/src/query/tests/histogramTest.cpp b/src/query/tests/histogramTest.cpp
index c23f0f59241d7bbc7527f7fe332fe97b666acbcc..4a5f7fbbbed18b0a2c2cd7fff7ad3101cfa11c41 100644
--- a/src/query/tests/histogramTest.cpp
+++ b/src/query/tests/histogramTest.cpp
@@ -9,7 +9,7 @@
#include "tstoken.h"
#include "tutil.h"
-#include "qhistogram.h"
+#include "qHistogram.h"
/* test validate the names for table/database */
TEST(testCase, histogram_binary_search) {
diff --git a/src/query/tests/resultBufferTest.cpp b/src/query/tests/resultBufferTest.cpp
index aaea0836d1fc30483a7e92ea0eaca95020ac4bca..63ed89ab9fe1de2222c6f5ea8ae05b331437f76d 100644
--- a/src/query/tests/resultBufferTest.cpp
+++ b/src/query/tests/resultBufferTest.cpp
@@ -2,15 +2,15 @@
#include
#include
+#include "qResultbuf.h"
#include "taos.h"
-#include "qresultBuf.h"
#include "tsdb.h"
namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
- int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, NULL);
+ int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0;
int32_t groupId = 0;
@@ -23,7 +23,6 @@ void simpleTest() {
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
-
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
destroyResultBuf(pResultBuf, NULL);
diff --git a/src/query/tests/tsBufTest.cpp b/src/query/tests/tsBufTest.cpp
index f81326195778a68259c96e06f713f968be597fd0..e9827518e189a078c6f7cba421a3e9b82f199a40 100644
--- a/src/query/tests/tsBufTest.cpp
+++ b/src/query/tests/tsBufTest.cpp
@@ -5,10 +5,10 @@
#include "taos.h"
#include "tsdb.h"
+#include "qTsbuf.h"
#include "tstoken.h"
#include "ttime.h"
#include "tutil.h"
-#include "qtsbuf.h"
namespace {
/**
diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h
index 40f2dac660fe0d7995015bfeb8b8b9303bd80086..210d95853cf216e93dd09feb4f8e03e15b269471 100644
--- a/src/tsdb/inc/tsdbMain.h
+++ b/src/tsdb/inc/tsdbMain.h
@@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
+void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c
index b29cec3cf9cb11a2059165599ee730f07347d155..1b7db635b337e2576ac61ac100dabad91be793df 100644
--- a/src/tsdb/src/tsdbMemTable.c
+++ b/src/tsdb/src/tsdbMemTable.c
@@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
-static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
@@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
return 0;
}
-static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
+void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c
index eb34805de47d713d78830d7ac8c2a47e45c378d0..37784577c498da13f15e2489c5c1aecc38106ad6 100644
--- a/src/tsdb/src/tsdbRead.c
+++ b/src/tsdb/src/tsdbRead.c
@@ -21,7 +21,7 @@
#include "tcompare.h"
#include "exception.h"
-#include "../../../query/inc/qast.h" // todo move to common module
+#include "../../query/inc/qAst.h" // todo move to common module
#include "tlosertree.h"
#include "tsdb.h"
#include "tsdbMain.h"
@@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle {
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
-static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
- SArray* sa);
+static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbQueryHandle* pQueryHandle);
@@ -240,7 +239,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
- tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
+ tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
@@ -331,7 +330,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
- tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
+ tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
+ pHandle->qinfo);
}
if (!imemEmpty) {
@@ -343,7 +343,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
- tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
+ tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
+ pHandle->qinfo);
}
return true;
@@ -354,7 +355,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
tSkipListDestroyIter(pCheckInfo->iiter);
}
-SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
+SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
@@ -371,20 +372,35 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
}
if (rmem != NULL && rimem != NULL) {
- if (dataRowKey(rmem) < dataRowKey(rimem)) {
- pCheckInfo->chosen = 0;
- return rmem;
- } else if (dataRowKey(rmem) == dataRowKey(rimem)) {
- // data ts are duplicated, ignore the data in mem
+ TSKEY r1 = dataRowKey(rmem);
+ TSKEY r2 = dataRowKey(rimem);
+
+ if (r1 == r2) { // data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = 1;
return rimem;
} else {
- pCheckInfo->chosen = 1;
- return rimem;
+ if (ASCENDING_TRAVERSE(order)) {
+ if (r1 < r2) {
+ pCheckInfo->chosen = 0;
+ return rmem;
+ } else {
+ pCheckInfo->chosen = 1;
+ return rimem;
+ }
+ } else {
+ if (r1 < r2) {
+ pCheckInfo->chosen = 1;
+ return rimem;
+ } else {
+ pCheckInfo->chosen = 0;
+ return rmem;
+ }
+ }
}
}
+ // at least one (rmem or rimem) is absent here
if (rmem != NULL) {
pCheckInfo->chosen = 0;
return rmem;
@@ -398,7 +414,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
return NULL;
}
-static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
+static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) {
@@ -412,19 +428,17 @@ static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
- } else {
- if (pCheckInfo->chosen == 1) {
- if (pCheckInfo->iiter != NULL) {
- hasNext = tSkipListIterNext(pCheckInfo->iiter);
- }
+ } else { //pCheckInfo->chosen == 1
+ if (pCheckInfo->iiter != NULL) {
+ hasNext = tSkipListIterNext(pCheckInfo->iiter);
+ }
- if (hasNext) {
- return hasNext;
- }
+ if (hasNext) {
+ return hasNext;
+ }
- if (pCheckInfo->iter != NULL) {
- return tSkipListIterGet(pCheckInfo->iter) != NULL;
- }
+ if (pCheckInfo->iter != NULL) {
+ return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
}
@@ -445,7 +459,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
initTableMemIterator(pHandle, pCheckInfo);
}
- SDataRow row = getSDataRowInTableMem(pCheckInfo);
+ SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order);
if (row == NULL) {
return false;
}
@@ -650,7 +664,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
- SDataRow row = getSDataRowInTableMem(pCheckInfo);
+ SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
@@ -680,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
}
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
- doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
+ doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
/*
* no data in cache, only load data from file
@@ -696,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
+ pCheckInfo->lastKey = cur->lastKey;
}
}
@@ -719,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0;
}
- doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
+ doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
@@ -736,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfRows - 1;
}
- doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
+ doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
@@ -892,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step;
-
+
return numOfRows + num;
}
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
- STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
+ int32_t numOfCols, STable* pTable) {
char* pData = NULL;
// the schema version info is embeded in SDataRow
@@ -958,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
-static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
- SArray* sa) {
+static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
@@ -972,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
- STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos;
@@ -1033,7 +1046,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
do {
- SDataRow row = getSDataRowInTableMem(pCheckInfo);
+ SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
if (row == NULL) {
break;
}
@@ -1051,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
- copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
+ copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
@@ -1061,9 +1074,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->lastKey = key + step;
cur->mixBlock = true;
- moveToNextRow(pCheckInfo);
+ moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
- moveToNextRow(pCheckInfo);
+ moveToNextRowInMem(pCheckInfo);
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
@@ -1072,7 +1085,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
- moveToNextRow(pCheckInfo);
+ moveToNextRowInMem(pCheckInfo);
}
int32_t start = -1;
@@ -1376,7 +1389,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
* }
*/
- tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt);
+ tsdbDebug("%p %d data blocks sort completed, %p", pQueryHandle, cnt, pQueryHandle->qinfo);
cleanBlockOrderSupporter(&sup, numOfTables);
free(pTree);
@@ -1391,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
-
+
+ STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
+ STimeWindow win = TSWINDOW_INITIALIZER;
+
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
+ tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
+
+ // current file are not overlapped with query time window, ignore remain files
+ if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
+ (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
+ tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
+ pQueryHandle->pFileGroup = NULL;
+ break;
+ }
+
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
@@ -1750,11 +1776,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs();
- STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj;
do {
- SDataRow row = getSDataRowInTableMem(pCheckInfo);
+ SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
if (row == NULL) {
break;
}
@@ -1772,14 +1797,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
win->ekey = key;
- copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
+ copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
if (++numOfRows >= maxRowsToRead) {
- moveToNextRow(pCheckInfo);
+ moveToNextRowInMem(pCheckInfo);
break;
}
- } while(moveToNextRow(pCheckInfo));
+ } while(moveToNextRowInMem(pCheckInfo));
assert(numOfRows <= maxRowsToRead);
@@ -1869,7 +1894,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
- // todo opt perf
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
@@ -1961,43 +1985,20 @@ static void destroyHelper(void* param) {
free(param);
}
-#define TAG_INVALID_COLUMN_INDEX -2
-static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
- // filter on table name(TBNAME)
- if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
- return TSDB_TBNAME_COLUMN_INDEX;
- }
-
- for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
- STColumn* pColumn = &pTSchema->columns[i];
- if (pColumn->bytes == pSchema->bytes && pColumn->type == pSchema->type && pColumn->colId == pSchema->colId) {
- return i;
- }
- }
-
- return -2;
-}
-
void filterPrepare(void* expr, void* param) {
tExprNode* pExpr = (tExprNode*)expr;
if (pExpr->_node.info != NULL) {
return;
}
- int32_t i = 0;
pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
- STSchema* pTSSchema = (STSchema*) param;
-
+ STSchema* pTSSchema = (STSchema*) param;
tQueryInfo* pInfo = pExpr->_node.info;
tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
- int32_t index = getTagColumnIndex(pTSSchema, pSchema);
- assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX);
-
pInfo->sch = *pSchema;
- pInfo->colIndex = index;
pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
pInfo->param = pTSSchema;
@@ -2143,7 +2144,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
char* val = NULL;
- if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
+ if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable);
} else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h
index f7c69e3973ad6fdefd4ef33f4a813221eedebe6d..34f35c380745172f5fc6a6d4aaf094cdf82c68ac 100644
--- a/src/util/inc/tutil.h
+++ b/src/util/inc/tutil.h
@@ -35,12 +35,12 @@ extern "C" {
#define WCHAR wchar_t
#define tfree(x) \
- { \
+ do { \
if (x) { \
free((void *)(x)); \
x = 0; \
} \
- }
+ } while(0);
#define tstrncpy(dst, src, size) \
do { \
diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c
index 3764df4afce21bea608e1a33351a343defcf7752..7b73b1e17c54a2278ebd15420eea96bb84bfe547 100644
--- a/src/util/src/tutil.c
+++ b/src/util/src/tutil.c
@@ -522,7 +522,7 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
- char tmpPath[PATH_MAX] = {0};
+ char tmpPath[PATH_MAX];
#ifdef WINDOWS
char *tmpDir = getenv("tmp");