提交 55e3352d 编写于 作者: H Haojun Liao

[td-225] refactor codes. opt resbuf perf. rename sources file

上级 1593712a
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h" #include "tutil.h"
#include "qExecutor.h" #include "qexecutor.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qtsbuf.h" #include "qtsbuf.h"
#include "tcmdtype.h" #include "tcmdtype.h"
......
...@@ -74,7 +74,7 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ ...@@ -74,7 +74,7 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); } void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
typedef struct tValuePair { typedef struct tValuePair {
tVariant v; tVariant v;
...@@ -330,12 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -330,12 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) { void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) {
assert(pResInfo->interResultBuf == NULL); assert(pResInfo->interResultBuf == NULL);
......
...@@ -242,35 +242,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -242,35 +242,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pEpSet) tscMgmtEpSet = *pEpSet; if (pEpSet) tscMgmtEpSet = *pEpSet;
} }
if (rpcMsg->pCont == NULL) {
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// if (pCmd->command == TSDB_SQL_CONNECT) {
// rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// rpcFreeCont(rpcMsg->pCont);
// return;
// }
// if (pCmd->command == TSDB_SQL_HB) { int32_t cmd = pCmd->command;
// rpcMsg->code = TSDB_CODE_RPC_NOT_READY; if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
// rpcFreeCont(rpcMsg->pCont); (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
// return; rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
// } rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
// if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
// pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW ||
// pCmd->command == TSDB_SQL_RETRIEVE) {
// // get table meta/vgroup query will not retry, do nothing
// }
// }
if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1; pSql->cmd.submitSchema = 1;
...@@ -283,14 +264,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -283,14 +264,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
// todo add test cases
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
} }
} }
}
pRes->rspLen = 0; pRes->rspLen = 0;
......
...@@ -19,15 +19,15 @@ ...@@ -19,15 +19,15 @@
#include "hash.h" #include "hash.h"
#include "qfill.h" #include "qfill.h"
#include "qresultBuf.h" #include "qresultbuf.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qtsbuf.h" #include "qtsbuf.h"
#include "query.h"
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "query.h"
struct SColumnFilterElem; struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
...@@ -158,7 +158,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -158,7 +158,7 @@ typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int32_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
......
...@@ -33,14 +33,19 @@ typedef struct SDiskbasedResultBuf { ...@@ -33,14 +33,19 @@ typedef struct SDiskbasedResultBuf {
int32_t fd; // data file fd int32_t fd; // data file fd
int32_t allocateId; // allocated page id int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages int32_t incStep; // minimum allocated pages
char* pBuf; // mmap buffer pointer void* pBuf; // mmap buffer pointer
char* path; // file path char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list SIDList list; // for each id, there is a page id list
void* iBuf; // inmemory buf
void* handle; // for debug purpose
} SDiskbasedResultBuf; } SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5) #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
#define DEFAULT_INMEM_BUF_PAGES 10
/** /**
* create disk-based result buffer * create disk-based result buffer
...@@ -49,7 +54,8 @@ typedef struct SDiskbasedResultBuf { ...@@ -49,7 +54,8 @@ typedef struct SDiskbasedResultBuf {
* @param rowSize * @param rowSize
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle); int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, void* handle);
/** /**
* *
...@@ -81,8 +87,14 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); ...@@ -81,8 +87,14 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
#define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id))) //#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id)))
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
if (id < pResultBuf->inMemPages) {
return pResultBuf->iBuf + id * pResultBuf->pageSize;
} else {
return pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize;
}
}
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
* @param pResultBuf * @param pResultBuf
......
...@@ -49,7 +49,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 ...@@ -49,7 +49,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
assert(pResult != NULL && pRuntimeEnv != NULL); assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
...@@ -59,6 +59,4 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 ...@@ -59,6 +59,4 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
__filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type);
bool supportPrefilter(int32_t type);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -255,7 +255,15 @@ extern int32_t functionCompatList[]; // compatible check array list ...@@ -255,7 +255,15 @@ extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval); bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
void resetResultInfo(SResultInfo *pResInfo); /**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
*/
#define RESET_RESULT_INFO(_r) \
do { \
(_r)->initialized = false; \
} while (0)
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf); void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) { static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
#include "exception.h" #include "exception.h"
#include "hash.h" #include "hash.h"
#include "qExecutor.h" #include "qast.h"
#include "qUtil.h" #include "qexecutor.h"
#include "qresultBuf.h" #include "qresultbuf.h"
#include "query.h" #include "query.h"
#include "queryLog.h" #include "queryLog.h"
#include "qast.h" #include "qutil.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscompression.h" #include "tscompression.h"
#include "ttime.h" #include "ttime.h"
...@@ -233,9 +233,7 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) { ...@@ -233,9 +233,7 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) {
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i); SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
if (pColIndex->flag == TSDB_COL_NORMAL) { if (pColIndex->flag == TSDB_COL_NORMAL) {
/* //make sure the normal column locates at the second position if tbname exists in group by clause
* make sure the normal column locates at the second position if tbname exists in group by clause
*/
if (pGroupbyExpr->numOfGroupCols > 1) { if (pGroupbyExpr->numOfGroupCols > 1) {
assert(pColIndex->colIndex > 0); assert(pColIndex->colIndex > 0);
} }
...@@ -294,6 +292,17 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { ...@@ -294,6 +292,17 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
return false; return false;
} }
bool isProjQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].base.functionId;
if (functId != TSDB_FUNC_PRJ && functId != TSDB_FUNC_TAGPRJ) {
return false;
}
}
return true;
}
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; } bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) { static bool limitResults(SQueryRuntimeEnv* pRuntimeEnv) {
...@@ -470,7 +479,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult ...@@ -470,7 +479,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
pData = getNewDataBuf(pResultBuf, sid, &pageId); pData = getNewDataBuf(pResultBuf, sid, &pageId);
} else { } else {
pageId = getLastPageId(list); pageId = getLastPageId(list);
pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, pageId); pData = getResBufPage(pResultBuf, pageId);
if (pData->num >= numOfRowsPerPage) { if (pData->num >= numOfRowsPerPage) {
pData = getNewDataBuf(pResultBuf, sid, &pageId); pData = getNewDataBuf(pResultBuf, sid, &pageId);
...@@ -1003,7 +1012,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1003,7 +1012,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break;
} }
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return -1; return -1;
...@@ -1201,9 +1209,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1201,9 +1209,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
continue; continue;
} }
// interval window query // interval window query, decide the time window according to the primary timestamp
if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
// decide the time window according to the primary timestamp
int64_t ts = tsCols[offset]; int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
...@@ -1225,8 +1232,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1225,8 +1232,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
while (1) { while (1) {
GET_NEXT_TIMEWINDOW(pQuery, &nextWin); GET_NEXT_TIMEWINDOW(pQuery, &nextWin);
if (/*pWindowResInfo->startTime > nextWin.skey ||*/ if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { (nextWin.skey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
break; break;
} }
...@@ -1484,6 +1490,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1484,6 +1490,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
goto _clean; goto _clean;
} }
qDebug("QInfo:%p setup runtime env1", GET_QINFO_ADDR(pRuntimeEnv));
pRuntimeEnv->offset[0] = 0; pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
...@@ -1528,6 +1536,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1528,6 +1536,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
} }
} }
qDebug("QInfo:%p setup runtime env2", GET_QINFO_ADDR(pRuntimeEnv));
// set the order information for top/bottom query // set the order information for top/bottom query
int32_t functionId = pCtx->functionId; int32_t functionId = pCtx->functionId;
...@@ -1548,17 +1558,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1548,17 +1558,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
} }
} }
qDebug("QInfo:%p setup runtime env3", GET_QINFO_ADDR(pRuntimeEnv));
char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultInfo) * pQuery->numOfOutput; char* buf = (char*) pRuntimeEnv->resultInfo + sizeof(SResultInfo) * pQuery->numOfOutput;
// set the intermediate result output buffer // set the intermediate result output buffer
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf); setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
qDebug("QInfo:%p setup runtime env4", GET_QINFO_ADDR(pRuntimeEnv));
// if it is group by normal column, do not set output buffer, the output buffer is pResult // if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !pRuntimeEnv->stableQuery) { if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
qDebug("QInfo:%p setup runtime env5", GET_QINFO_ADDR(pRuntimeEnv));
setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx); setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx);
qDebug("QInfo:%p init completed", GET_QINFO_ADDR(pRuntimeEnv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_clean: _clean:
...@@ -1910,9 +1928,20 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { ...@@ -1910,9 +1928,20 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
return num; return num;
} }
static FORCE_INLINE int32_t getNumOfRowsInResultPage(SQuery *pQuery, bool topBotQuery, bool isSTableQuery) { static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
int32_t rowSize = pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, topBotQuery, isSTableQuery); SQuery* pQuery = pRuntimeEnv->pQuery;
return (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
*rowsize = pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows
*ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
while(((*rowsize) * 2) > (*ps) - overhead) {
*ps = (*ps << 1u);
}
pRuntimeEnv->numOfRowsPerPage = ((*ps) - sizeof(tFilePage)) / (*rowsize);
} }
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
...@@ -2038,8 +2067,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, ...@@ -2038,8 +2067,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
} else { // check if this data block is required to load } else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block. // Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, loading current // If current data block is contained by all possible time window, do not load current data block.
// data block is not needed.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
status = BLK_DATA_ALL_NEEDED; status = BLK_DATA_ALL_NEEDED;
} }
...@@ -2424,7 +2452,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes ...@@ -2424,7 +2452,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes; pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE; pCtx[i].currentStage = FIRST_STAGE_MERGE;
resetResultInfo(pCtx[i].resultInfo); RESET_RESULT_INFO(pCtx[i].resultInfo);
aAggs[functionId].init(&pCtx[i]); aAggs[functionId].init(&pCtx[i]);
} }
...@@ -2661,7 +2689,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2661,7 +2689,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t size = taosArrayGetSize(list); int32_t size = taosArrayGetSize(list);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
int32_t* pgId = taosArrayGet(list, i); int32_t* pgId = taosArrayGet(list, i);
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); tFilePage *pData = getResBufPage(pResultBuf, *pgId);
total += pData->num; total += pData->num;
} }
...@@ -2670,7 +2698,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2670,7 +2698,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t offset = 0; int32_t offset = 0;
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
int32_t* pgId = taosArrayGet(list, j); int32_t* pgId = taosArrayGet(list, j);
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); tFilePage *pData = getResBufPage(pResultBuf, *pgId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
...@@ -2860,10 +2888,10 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { ...@@ -2860,10 +2888,10 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t capacity = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t pageId = -1; int32_t pageId = -1;
int32_t capacity = pResultBuf->numOfRowsPerPage;
int32_t remain = pQuery->sdata[0]->num; int32_t remain = pQuery->sdata[0]->num;
int32_t offset = 0; int32_t offset = 0;
...@@ -3033,7 +3061,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3033,7 +3061,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
* set the output buffer information and intermediate buffer * set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/ */
resetResultInfo(&pRuntimeEnv->resultInfo[i]); RESET_RESULT_INFO(&pRuntimeEnv->resultInfo[i]);
pCtx->resultInfo = &pRuntimeEnv->resultInfo[i]; pCtx->resultInfo = &pRuntimeEnv->resultInfo[i];
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
...@@ -3072,7 +3100,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { ...@@ -3072,7 +3100,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output; pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output;
} }
resetResultInfo(pRuntimeEnv->pCtx[j].resultInfo); RESET_RESULT_INFO(pRuntimeEnv->pCtx[j].resultInfo);
} }
} }
...@@ -3326,8 +3354,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { ...@@ -3326,8 +3354,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
if (pRuntimeEnv->pSecQueryHandle == NULL) { if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno); longjmp(pRuntimeEnv->env, terrno);
} }
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
...@@ -3462,7 +3490,6 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { ...@@ -3462,7 +3490,6 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return; return;
} }
int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true); sizeof(groupIndex), true);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
...@@ -3474,7 +3501,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { ...@@ -3474,7 +3501,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
* all group belong to one result set, and each group result has different group id so set the id to be one * all group belong to one result set, and each group result has different group id so set the id to be one
*/ */
if (pWindowRes->pos.pageId == -1) { if (pWindowRes->pos.pageId == -1) {
if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage) != if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return; return;
} }
...@@ -4170,7 +4197,6 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { ...@@ -4170,7 +4197,6 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
} }
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) { int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
qDebug("QInfo:%p start to init qhandle", pQInfo);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -4209,33 +4235,42 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4209,33 +4235,42 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
return code; return code;
} }
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->topBotQuery, isSTableQuery); int32_t ps = DEFAULT_PAGE_SIZE;
int32_t rowsize = 0;
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
int32_t rows = getInitialPageNum(pQInfo); int32_t numOfPages = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfPages, rowsize, ps, numOfPages, pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
int16_t type = TSDB_DATA_TYPE_NULL; int16_t type = TSDB_DATA_TYPE_NULL;
int32_t threshold = 0;
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
threshold = 4000;
} else { } else {
type = TSDB_DATA_TYPE_INT; // group id type = TSDB_DATA_TYPE_INT; // group id
threshold = GET_NUM_OF_TABLEGROUP(pQInfo);
if (threshold < 8) {
threshold = 8;
}
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type); code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 8, threshold, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo); int32_t numOfResultRows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, numOfResultRows, rowsize, ps, numOfResultRows, pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4247,7 +4282,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4247,7 +4282,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_TIMESTAMP; type = TSDB_DATA_TYPE_TIMESTAMP;
} }
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type); code = initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, numOfResultRows, 4096, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -5705,6 +5740,7 @@ static void calResultBufSize(SQuery* pQuery) { ...@@ -5705,6 +5740,7 @@ static void calResultBufSize(SQuery* pQuery) {
const int32_t RESULT_MSG_MIN_ROWS = 8192; const int32_t RESULT_MSG_MIN_ROWS = 8192;
const float RESULT_THRESHOLD_RATIO = 0.85; const float RESULT_THRESHOLD_RATIO = 0.85;
if (isProjQuery(pQuery)) {
int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize; int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize;
if (numOfRes < RESULT_MSG_MIN_ROWS) { if (numOfRes < RESULT_MSG_MIN_ROWS) {
numOfRes = RESULT_MSG_MIN_ROWS; numOfRes = RESULT_MSG_MIN_ROWS;
...@@ -5712,6 +5748,10 @@ static void calResultBufSize(SQuery* pQuery) { ...@@ -5712,6 +5748,10 @@ static void calResultBufSize(SQuery* pQuery) {
pQuery->rec.capacity = numOfRes; pQuery->rec.capacity = numOfRes;
pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO; pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO;
} else { // in case of non-prj query, a smaller output buffer will be used.
pQuery->rec.capacity = 4096;
pQuery->rec.threshold = pQuery->rec.capacity * RESULT_THRESHOLD_RATIO;
}
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
...@@ -5723,6 +5763,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5723,6 +5763,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
if (pQInfo == NULL) { if (pQInfo == NULL) {
goto _cleanup_qinfo; goto _cleanup_qinfo;
} }
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
pQInfo->tableGroupInfo = *pTableGroupInfo; pQInfo->tableGroupInfo = *pTableGroupInfo;
...@@ -6041,11 +6082,10 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -6041,11 +6082,10 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata); tfree(pQuery->sdata);
tfree(pQuery); tfree(pQuery);
pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo); qDebug("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo); tfree(pQInfo);
} }
...@@ -6209,7 +6249,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo ...@@ -6209,7 +6249,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
} }
int64_t el = taosGetTimestampUs() - st; int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, elapsed time:%"PRId64"us", pQueryMsg, el); qDebug("qmsg:%p tag filter completed, numOfTables:%zu, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else { } else {
assert(0); assert(0);
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "qExecutor.h" #include "qexecutor.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcompare.h" #include "tcompare.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
...@@ -554,5 +554,3 @@ __filter_func_t* getValueFilterFuncArray(int32_t type) { ...@@ -554,5 +554,3 @@ __filter_func_t* getValueFilterFuncArray(int32_t type) {
default: return NULL; default: return NULL;
} }
} }
bool supportPrefilter(int32_t type) { return type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR; }
#include "qresultBuf.h" #include "qresultbuf.h"
#include "hash.h" #include "hash.h"
#include "qextbuffer.h" #include "qextbuffer.h"
#include "taoserror.h"
#include "queryLog.h" #include "queryLog.h"
#include "taoserror.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
int32_t pagesize, int32_t inMemPages, void* handle) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf; SDiskbasedResultBuf* pResBuf = *pResultBuf;
if (pResBuf == NULL) { if (pResBuf == NULL) {
return TSDB_CODE_COM_OUT_OF_MEMORY; return TSDB_CODE_COM_OUT_OF_MEMORY;
} }
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize; pResBuf->pageSize = pagesize;
pResBuf->numOfPages = size; pResBuf->numOfPages = inMemPages; // all pages are in buffer in the first place
pResBuf->inMemPages = inMemPages;
assert(inMemPages <= numOfPages);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->incStep = 4; pResBuf->incStep = 4;
pResBuf->allocateId = -1;
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
// init id hash table // init id hash table
pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->idsTable = taosHashInit(numOfPages, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = taosArrayInit(size, POINTER_BYTES); pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
char path[4096] = {0}; char path[PATH_MAX] = {0};
getTmpfilePath("tsdb_qbuf", path); getTmpfilePath("tsdb_qbuf", path);
pResBuf->path = strdup(path); pResBuf->path = strdup(path);
pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666); pResBuf->fd = FD_INITIALIZER;
if (!FD_VALID(pResBuf->fd)) { pResBuf->pBuf = NULL;
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
if (!FD_VALID(pResultBuf->fd)) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0); pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResBuf->pBuf == MAP_FAILED) { if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno)); qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
qDebug("QInfo:%p create tmp file for output result:%s, %" PRId64 "bytes", handle, pResBuf->path, pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResBuf->totalBufSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNumOfPages) {
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize);
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } int32_t ret = TSDB_CODE_SUCCESS;
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) { if (pResultBuf->pBuf == NULL) {
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE == pResultBuf->totalBufSize); assert(pResultBuf->fd == FD_INITIALIZER);
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize); if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
pResultBuf->numOfPages += numOfPages; return ret;
}
} else {
ret = munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->numOfPages += incNumOfPages;
/* /*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient * be insufficient
*/ */
ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != 0) { if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno)); // strerror(errno));
return TSDB_CODE_QRY_NO_DISKSPACE; return TSDB_CODE_QRY_NO_DISKSPACE;
} }
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0); pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResultBuf->pBuf == MAP_FAILED) { if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) { #define NO_AVAILABLE_PAGES(_b) ((_b)->allocateId == (_b)->numOfPages - 1)
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL); assert(pResultBuf != NULL);
...@@ -121,20 +151,19 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int ...@@ -121,20 +151,19 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int
} }
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (noMoreAvailablePages(pResultBuf)) { if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) { if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
} }
// register new id in this group // register new id in this group
*pageId = (pResultBuf->allocateId++); *pageId = (++pResultBuf->allocateId);
registerPageId(pResultBuf, groupId, *pageId); registerPageId(pResultBuf, groupId, *pageId);
tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
// clear memory for the new page // clear memory for the new page
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE); tFilePage* page = getResBufPage(pResultBuf, *pageId);
memset(page, 0, pResultBuf->pageSize);
return page; return page;
} }
...@@ -156,13 +185,18 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { ...@@ -156,13 +185,18 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
} }
if (FD_VALID(pResultBuf->fd)) { if (FD_VALID(pResultBuf->fd)) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
close(pResultBuf->fd); close(pResultBuf->fd);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->pBuf = NULL;
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
pResultBuf->totalBufSize);
} }
qDebug("QInfo:%p disk-based output buffer closed, %" PRId64 " bytes, file:%s", handle, pResultBuf->totalBufSize, pResultBuf->path);
munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
unlink(pResultBuf->path); unlink(pResultBuf->path);
tfree(pResultBuf->path); tfree(pResultBuf->path);
size_t size = taosArrayGetSize(pResultBuf->list); size_t size = taosArrayGetSize(pResultBuf->list);
......
...@@ -19,19 +19,17 @@ ...@@ -19,19 +19,17 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "ttime.h" #include "ttime.h"
#include "qExecutor.h" #include "qexecutor.h"
#include "qUtil.h" #include "qutil.h"
int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0; int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
size += pQuery->pSelectExpr[i].interBytes; size += pQuery->pSelectExpr[i].interBytes;
} }
assert(size > 0); assert(size > 0);
return size; return size;
} }
...@@ -243,7 +241,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow ...@@ -243,7 +241,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size); memset(s, 0, size);
resetResultInfo(pResultInfo); RESET_RESULT_INFO(pResultInfo);
} }
pWindowRes->numOfRows = 0; pWindowRes->numOfRows = 0;
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
#include <cassert> #include <cassert>
#include <iostream> #include <iostream>
#include "qresultbuf.h"
#include "taos.h" #include "taos.h"
#include "qresultBuf.h"
#include "tsdb.h" #include "tsdb.h"
namespace { namespace {
......
...@@ -1376,7 +1376,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1376,7 +1376,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
* } * }
*/ */
tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt); tsdbDebug("%p %d data blocks sort completed, %p", pQueryHandle, cnt, pQueryHandle->qinfo);
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
free(pTree); free(pTree);
......
...@@ -522,7 +522,7 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -522,7 +522,7 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-"; const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX] = {0}; char tmpPath[PATH_MAX];
#ifdef WINDOWS #ifdef WINDOWS
char *tmpDir = getenv("tmp"); char *tmpDir = getenv("tmp");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册