未验证 提交 3f8c0b07 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2721 from taosdata/feature/query

Feature/query
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "qextbuffer.h" #include "qExtbuffer.h"
#include "qfill.h" #include "qFill.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tsclient.h" #include "tsclient.h"
......
...@@ -23,11 +23,11 @@ extern "C" { ...@@ -23,11 +23,11 @@ extern "C" {
/* /*
* @date 2018/09/30 * @date 2018/09/30
*/ */
#include "os.h"
#include "tbuffer.h"
#include "exception.h" #include "exception.h"
#include "qextbuffer.h" #include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h" #include "taosdef.h"
#include "tbuffer.h"
#include "tscLocalMerge.h" #include "tscLocalMerge.h"
#include "tsclient.h" #include "tsclient.h"
......
...@@ -31,8 +31,8 @@ extern "C" { ...@@ -31,8 +31,8 @@ extern "C" {
#include "tutil.h" #include "tutil.h"
#include "qExecutor.h" #include "qExecutor.h"
#include "qTsbuf.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qtsbuf.h"
#include "tcmdtype.h" #include "tcmdtype.h"
// forward declaration // forward declaration
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
*/ */
#include "os.h" #include "os.h"
#include "qextbuffer.h" #include "qAst.h"
#include "qfill.h" #include "qExtbuffer.h"
#include "qhistogram.h" #include "qFill.h"
#include "qpercentile.h" #include "qHistogram.h"
#include "qsyntaxtreefunction.h" #include "qPercentile.h"
#include "qtsbuf.h" #include "qSyntaxtreefunction.h"
#include "qTsbuf.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscompression.h" #include "tscompression.h"
...@@ -74,7 +74,7 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ ...@@ -74,7 +74,7 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); } void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
typedef struct tValuePair { typedef struct tValuePair {
tVariant v; tVariant v;
...@@ -330,12 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -330,12 +330,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) { void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) {
assert(pResInfo->interResultBuf == NULL); assert(pResInfo->interResultBuf == NULL);
......
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h" #include "qExtbuffer.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "taosdef.h" #include "taosdef.h"
#include "tcache.h"
#include "tname.h"
#include "tscLog.h" #include "tscLog.h"
#include "qextbuffer.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tname.h" #include "tsclient.h"
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength); static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "qAst.h"
#include "taos.h" #include "taos.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#include "tscLog.h" #include "tscLog.h"
......
...@@ -294,52 +294,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -294,52 +294,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
} }
if (rpcMsg->pCont == NULL) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// if (rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
// if (pCmd->command == TSDB_SQL_CONNECT) {
// rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// rpcFreeCont(rpcMsg->pCont);
// return;
// }
// if (pCmd->command == TSDB_SQL_HB) {
// rpcMsg->code = TSDB_CODE_RPC_NOT_READY;
// rpcFreeCont(rpcMsg->pCont);
// return;
// }
// if (pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
// pCmd->command == TSDB_SQL_STABLEVGROUP || pCmd->command == TSDB_SQL_SHOW ||
// pCmd->command == TSDB_SQL_RETRIEVE) {
// // get table meta/vgroup query will not retry, do nothing
// }
// }
if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code int32_t cmd = pCmd->command;
if (pSql->retry > pSql->maxRetry) { if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
} else { rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
// if there is an error occurring, proceed to the following error handling procedure. tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// todo add test cases
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
rpcFreeCont(rpcMsg->pCont); if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
return; pSql->cmd.submitSchema = 1;
} }
pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
} else {
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
// if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
rpcFreeCont(rpcMsg->pCont);
return;
} }
} }
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "hash.h" #include "hash.h"
#include "os.h" #include "os.h"
#include "qast.h" #include "qAst.h"
#include "tcache.h" #include "tcache.h"
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#include "os.h" #include "os.h"
#include "qtsbuf.h" #include "qAst.h"
#include "qast.h" #include "qTsbuf.h"
#include "tcompare.h" #include "tcompare.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
......
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "hash.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qAst.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tcache.h" #include "tcache.h"
#include "tkey.h" #include "tkey.h"
#include "tmd5.h" #include "tmd5.h"
......
...@@ -45,7 +45,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *); ...@@ -45,7 +45,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
* *
*/ */
typedef struct tQueryInfo { typedef struct tQueryInfo {
int32_t colIndex; // index of column in schema
uint8_t optr; // expression operator uint8_t optr; // expression operator
SSchema sch; // schema of tags SSchema sch; // schema of tags
char* q; char* q;
......
...@@ -18,16 +18,16 @@ ...@@ -18,16 +18,16 @@
#include "os.h" #include "os.h"
#include "hash.h" #include "hash.h"
#include "qfill.h" #include "qFill.h"
#include "qresultBuf.h" #include "qResultbuf.h"
#include "qTsbuf.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qtsbuf.h" #include "query.h"
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "query.h"
struct SColumnFilterElem; struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
...@@ -158,7 +158,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -158,7 +158,7 @@ typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int32_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
#include "talgo.h" #include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo #define DEFAULT_PAGE_SIZE (1024L*4) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64 #define INITIAL_ALLOCATION_BUFFER_SIZE 64
......
...@@ -21,8 +21,8 @@ extern "C" { ...@@ -21,8 +21,8 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h" #include "taosdef.h"
#include "qextbuffer.h"
typedef struct { typedef struct {
STColumn col; // column info STColumn col; // column info
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#ifndef TDENGINE_QPERCENTILE_H #ifndef TDENGINE_QPERCENTILE_H
#define TDENGINE_QPERCENTILE_H #define TDENGINE_QPERCENTILE_H
#include "qextbuffer.h" #include "qExtbuffer.h"
typedef struct MinMaxEntry { typedef struct MinMaxEntry {
union { union {
......
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "qextbuffer.h"
#include "hash.h" #include "hash.h"
#include "os.h"
#include "qExtbuffer.h"
typedef struct SArray* SIDList; typedef struct SArray* SIDList;
...@@ -33,14 +33,20 @@ typedef struct SDiskbasedResultBuf { ...@@ -33,14 +33,20 @@ typedef struct SDiskbasedResultBuf {
int32_t fd; // data file fd int32_t fd; // data file fd
int32_t allocateId; // allocated page id int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages int32_t incStep; // minimum allocated pages
char* pBuf; // mmap buffer pointer void* pBuf; // mmap buffer pointer
char* path; // file path char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list SIDList list; // for each id, there is a page id list
void* iBuf; // inmemory buf
void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
} SDiskbasedResultBuf; } SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5) #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
#define DEFAULT_INMEM_BUF_PAGES 10
/** /**
* create disk-based result buffer * create disk-based result buffer
...@@ -49,7 +55,8 @@ typedef struct SDiskbasedResultBuf { ...@@ -49,7 +55,8 @@ typedef struct SDiskbasedResultBuf {
* @param rowSize * @param rowSize
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle); int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, void* handle);
/** /**
* *
...@@ -81,8 +88,13 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); ...@@ -81,8 +88,13 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
#define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id))) static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
if (id < pResultBuf->inMemPages) {
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
} else {
return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize);
}
}
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
* @param pResultBuf * @param pResultBuf
......
...@@ -49,7 +49,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 ...@@ -49,7 +49,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
assert(pResult != NULL && pRuntimeEnv != NULL); assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId); tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery); int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
...@@ -59,6 +59,4 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 ...@@ -59,6 +59,4 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
__filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type);
bool supportPrefilter(int32_t type);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -255,7 +255,15 @@ extern int32_t functionCompatList[]; // compatible check array list ...@@ -255,7 +255,15 @@ extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval); bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
void resetResultInfo(SResultInfo *pResInfo); /**
* the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized
*/
#define RESET_RESULT_INFO(_r) \
do { \
(_r)->initialized = false; \
} while (0)
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf); void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) { static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
......
...@@ -16,17 +16,17 @@ ...@@ -16,17 +16,17 @@
#include "os.h" #include "os.h"
#include "tname.h"
#include "qast.h"
#include "tsdb.h"
#include "exception.h" #include "exception.h"
#include "qAst.h"
#include "qSyntaxtreefunction.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h" #include "tarray.h"
#include "tbuffer.h" #include "tbuffer.h"
#include "tcompare.h" #include "tcompare.h"
#include "tname.h"
#include "tsdb.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tstoken.h" #include "tstoken.h"
...@@ -678,7 +678,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -678,7 +678,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData); tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) { if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q); addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) { } else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
...@@ -716,7 +716,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -716,7 +716,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
} }
tQueryInfo *pQueryInfo = pExpr->_node.info; tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) { if (pQueryInfo->sch.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result); tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else { } else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
......
此差异已折叠。
...@@ -12,16 +12,15 @@ ...@@ -12,16 +12,15 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qExtbuffer.h"
#include "os.h" #include "os.h"
#include "tulog.h" #include "queryLog.h"
#include "qextbuffer.h"
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "ttime.h" #include "tulog.h"
#include "tutil.h" #include "tutil.h"
#include "queryLog.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qFill.h"
#include "os.h" #include "os.h"
#include "qfill.h" #include "qExtbuffer.h"
#include "qextbuffer.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
......
...@@ -554,5 +554,3 @@ __filter_func_t* getValueFilterFuncArray(int32_t type) { ...@@ -554,5 +554,3 @@ __filter_func_t* getValueFilterFuncArray(int32_t type) {
default: return NULL; default: return NULL;
} }
} }
bool supportPrefilter(int32_t type) { return type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR; }
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "qhistogram.h" #include "qHistogram.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlosertree.h" #include "tlosertree.h"
......
...@@ -13,12 +13,12 @@ ...@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qPercentile.h"
#include "os.h" #include "os.h"
#include "tulog.h" #include "queryLog.h"
#include "qpercentile.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "queryLog.h" #include "tulog.h"
tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) {
tExtMemBuffer *pBuffer = NULL; tExtMemBuffer *pBuffer = NULL;
......
#include "qresultBuf.h" #include "qResultbuf.h"
#include "hash.h" #include "hash.h"
#include "qextbuffer.h" #include "qExtbuffer.h"
#include "taoserror.h"
#include "queryLog.h" #include "queryLog.h"
#include "taoserror.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
int32_t pagesize, int32_t inMemPages, void* handle) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf; SDiskbasedResultBuf* pResBuf = *pResultBuf;
if (pResBuf == NULL) { if (pResBuf == NULL) {
return TSDB_CODE_COM_OUT_OF_MEMORY; return TSDB_CODE_COM_OUT_OF_MEMORY;
} }
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
pResBuf->numOfPages = size;
pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResBuf->pageSize = pagesize;
pResBuf->numOfPages = inMemPages; // all pages are in buffer in the first place
pResBuf->inMemPages = inMemPages;
assert(inMemPages <= numOfPages);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->incStep = 4; pResBuf->incStep = 4;
pResBuf->allocateId = -1;
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
// init id hash table // init id hash table
pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pResBuf->idsTable = taosHashInit(numOfPages, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = taosArrayInit(size, POINTER_BYTES); pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
char path[4096] = {0}; char path[PATH_MAX] = {0};
getTmpfilePath("tsdb_qbuf", path); getTmpfilePath("tsdb_qbuf", path);
pResBuf->path = strdup(path); pResBuf->path = strdup(path);
pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666); pResBuf->fd = FD_INITIALIZER;
if (!FD_VALID(pResBuf->fd)) { pResBuf->pBuf = NULL;
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
if (!FD_VALID(pResultBuf->fd)) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0); pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResBuf->pBuf == MAP_FAILED) { if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno)); qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
qDebug("QInfo:%p create tmp file for output result:%s, %" PRId64 "bytes", handle, pResBuf->path, pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResBuf->totalBufSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNumOfPages) {
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize);
int32_t ret = TSDB_CODE_SUCCESS;
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } if (pResultBuf->pBuf == NULL) {
assert(pResultBuf->fd == FD_INITIALIZER);
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) { if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE == pResultBuf->totalBufSize); return ret;
}
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize); } else {
pResultBuf->numOfPages += numOfPages; ret = munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->numOfPages += incNumOfPages;
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may /*
* be insufficient * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
*/ * be insufficient
ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE); */
if (ret != 0) { ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, if (ret != TSDB_CODE_SUCCESS) {
// strerror(errno)); // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
return TSDB_CODE_QRY_NO_DISKSPACE; // strerror(errno));
} return TSDB_CODE_QRY_NO_DISKSPACE;
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0); pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResultBuf->pBuf == MAP_FAILED) { if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) { #define NO_AVAILABLE_PAGES(_b) ((_b)->allocateId == (_b)->numOfPages - 1)
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL); assert(pResultBuf != NULL);
...@@ -121,20 +152,19 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int ...@@ -121,20 +152,19 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int
} }
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (noMoreAvailablePages(pResultBuf)) { if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) { if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
} }
// register new id in this group // register new id in this group
*pageId = (pResultBuf->allocateId++); *pageId = (++pResultBuf->allocateId);
registerPageId(pResultBuf, groupId, *pageId); registerPageId(pResultBuf, groupId, *pageId);
tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
// clear memory for the new page // clear memory for the new page
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE); tFilePage* page = getResBufPage(pResultBuf, *pageId);
memset(page, 0, pResultBuf->pageSize);
return page; return page;
} }
...@@ -144,7 +174,7 @@ int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf ...@@ -144,7 +174,7 @@ int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t slot = getGroupIndex(pResultBuf, groupId); int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) { if (slot < 0) {
return taosArrayInit(1, sizeof(int32_t)); return pResultBuf->emptyDummyIdList;
} else { } else {
return taosArrayGetP(pResultBuf->list, slot); return taosArrayGetP(pResultBuf->list, slot);
} }
...@@ -156,13 +186,18 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { ...@@ -156,13 +186,18 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
} }
if (FD_VALID(pResultBuf->fd)) { if (FD_VALID(pResultBuf->fd)) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
close(pResultBuf->fd); close(pResultBuf->fd);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->pBuf = NULL;
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
pResultBuf->totalBufSize);
} }
qDebug("QInfo:%p disk-based output buffer closed, %" PRId64 " bytes, file:%s", handle, pResultBuf->totalBufSize, pResultBuf->path);
munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
unlink(pResultBuf->path); unlink(pResultBuf->path);
tfree(pResultBuf->path); tfree(pResultBuf->path);
size_t size = taosArrayGetSize(pResultBuf->list); size_t size = taosArrayGetSize(pResultBuf->list);
...@@ -172,8 +207,10 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { ...@@ -172,8 +207,10 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
} }
taosArrayDestroy(pResultBuf->list); taosArrayDestroy(pResultBuf->list);
taosArrayDestroy(pResultBuf->emptyDummyIdList);
taosHashCleanup(pResultBuf->idsTable); taosHashCleanup(pResultBuf->idsTable);
tfree(pResultBuf->iBuf);
tfree(pResultBuf); tfree(pResultBuf);
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "os.h" #include "os.h"
#include "qsyntaxtreefunction.h" #include "qSyntaxtreefunction.h"
#include "taosdef.h" #include "taosdef.h"
#include "tutil.h" #include "tutil.h"
......
#include "qtsbuf.h" #include "qTsbuf.h"
#include "taoserror.h"
#include "tscompression.h" #include "tscompression.h"
#include "tutil.h" #include "tutil.h"
#include "taoserror.h"
static int32_t getDataStartOffset(); static int32_t getDataStartOffset();
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo); static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo);
......
...@@ -26,12 +26,10 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { ...@@ -26,12 +26,10 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0; int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
size += pQuery->pSelectExpr[i].interBytes; size += pQuery->pSelectExpr[i].interBytes;
} }
assert(size > 0); assert(size > 0);
return size; return size;
} }
...@@ -243,7 +241,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow ...@@ -243,7 +241,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size); memset(s, 0, size);
resetResultInfo(pResultInfo); RESET_RESULT_INFO(pResultInfo);
} }
pWindowRes->numOfRows = 0; pWindowRes->numOfRows = 0;
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
#include <cassert> #include <cassert>
#include <iostream> #include <iostream>
#include "qAst.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tsdb.h" #include "tsdb.h"
#include "tskiplist.h" #include "tskiplist.h"
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tstoken.h" #include "tstoken.h"
#include "tutil.h" #include "tutil.h"
#include "qhistogram.h" #include "qHistogram.h"
/* test validate the names for table/database */ /* test validate the names for table/database */
TEST(testCase, histogram_binary_search) { TEST(testCase, histogram_binary_search) {
......
...@@ -2,15 +2,15 @@ ...@@ -2,15 +2,15 @@
#include <cassert> #include <cassert>
#include <iostream> #include <iostream>
#include "qResultbuf.h"
#include "taos.h" #include "taos.h"
#include "qresultBuf.h"
#include "tsdb.h" #include "tsdb.h"
namespace { namespace {
// simple test // simple test
void simpleTest() { void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL; SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, NULL); int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0; int32_t pageId = 0;
int32_t groupId = 0; int32_t groupId = 0;
...@@ -23,7 +23,6 @@ void simpleTest() { ...@@ -23,7 +23,6 @@ void simpleTest() {
SIDList list = getDataBufPagesIdList(pResultBuf, groupId); SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1); ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1); ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
destroyResultBuf(pResultBuf, NULL); destroyResultBuf(pResultBuf, NULL);
......
...@@ -5,10 +5,10 @@ ...@@ -5,10 +5,10 @@
#include "taos.h" #include "taos.h"
#include "tsdb.h" #include "tsdb.h"
#include "qTsbuf.h"
#include "tstoken.h" #include "tstoken.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "qtsbuf.h"
namespace { namespace {
/** /**
......
...@@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); ...@@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......
...@@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo); ...@@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
...@@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK ...@@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
return 0; return 0;
} }
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "exception.h" #include "exception.h"
#include "../../../query/inc/qast.h" // todo move to common module #include "../../query/inc/qAst.h" // todo move to common module
#include "tlosertree.h" #include "tlosertree.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbMain.h" #include "tsdbMain.h"
...@@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle { ...@@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle {
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbQueryHandle* pQueryHandle); STsdbQueryHandle* pQueryHandle);
...@@ -240,7 +239,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -240,7 +239,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
...@@ -331,7 +330,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -331,7 +330,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else { } else {
tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
} }
if (!imemEmpty) { if (!imemEmpty) {
...@@ -343,7 +343,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -343,7 +343,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else { } else {
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid); tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
} }
return true; return true;
...@@ -354,7 +355,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { ...@@ -354,7 +355,7 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
tSkipListDestroyIter(pCheckInfo->iiter); tSkipListDestroyIter(pCheckInfo->iiter);
} }
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
SDataRow rmem = NULL, rimem = NULL; SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) { if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
...@@ -371,20 +372,35 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { ...@@ -371,20 +372,35 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
} }
if (rmem != NULL && rimem != NULL) { if (rmem != NULL && rimem != NULL) {
if (dataRowKey(rmem) < dataRowKey(rimem)) { TSKEY r1 = dataRowKey(rmem);
pCheckInfo->chosen = 0; TSKEY r2 = dataRowKey(rimem);
return rmem;
} else if (dataRowKey(rmem) == dataRowKey(rimem)) { if (r1 == r2) { // data ts are duplicated, ignore the data in mem
// data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = 1; pCheckInfo->chosen = 1;
return rimem; return rimem;
} else { } else {
pCheckInfo->chosen = 1; if (ASCENDING_TRAVERSE(order)) {
return rimem; if (r1 < r2) {
pCheckInfo->chosen = 0;
return rmem;
} else {
pCheckInfo->chosen = 1;
return rimem;
}
} else {
if (r1 < r2) {
pCheckInfo->chosen = 1;
return rimem;
} else {
pCheckInfo->chosen = 0;
return rmem;
}
}
} }
} }
// at least one (rmem or rimem) is absent here
if (rmem != NULL) { if (rmem != NULL) {
pCheckInfo->chosen = 0; pCheckInfo->chosen = 0;
return rmem; return rmem;
...@@ -398,7 +414,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) { ...@@ -398,7 +414,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
return NULL; return NULL;
} }
static bool moveToNextRow(STableCheckInfo* pCheckInfo) { static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
bool hasNext = false; bool hasNext = false;
if (pCheckInfo->chosen == 0) { if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) { if (pCheckInfo->iter != NULL) {
...@@ -412,19 +428,17 @@ static bool moveToNextRow(STableCheckInfo* pCheckInfo) { ...@@ -412,19 +428,17 @@ static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
if (pCheckInfo->iiter != NULL) { if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL; return tSkipListIterGet(pCheckInfo->iiter) != NULL;
} }
} else { } else { //pCheckInfo->chosen == 1
if (pCheckInfo->chosen == 1) { if (pCheckInfo->iiter != NULL) {
if (pCheckInfo->iiter != NULL) { hasNext = tSkipListIterNext(pCheckInfo->iiter);
hasNext = tSkipListIterNext(pCheckInfo->iiter); }
}
if (hasNext) { if (hasNext) {
return hasNext; return hasNext;
} }
if (pCheckInfo->iter != NULL) { if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL; return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
} }
} }
...@@ -445,7 +459,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -445,7 +459,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
initTableMemIterator(pHandle, pCheckInfo); initTableMemIterator(pHandle, pCheckInfo);
} }
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order);
if (row == NULL) { if (row == NULL) {
return false; return false;
} }
...@@ -650,7 +664,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -650,7 +664,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
...@@ -680,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -680,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
} }
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
/* /*
* no data in cache, only load data from file * no data in cache, only load data from file
...@@ -696,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -696,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = false; cur->mixBlock = false;
cur->blockCompleted = true; cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey;
} }
} }
...@@ -719,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -719,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0; cur->pos = 0;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
...@@ -736,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -736,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
...@@ -892,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap ...@@ -892,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step; pQueryHandle->cur.lastKey = tsArray[end] + step;
return numOfRows + num; return numOfRows + num;
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { int32_t numOfCols, STable* pTable) {
char* pData = NULL; char* pData = NULL;
// the schema version info is embeded in SDataRow // the schema version info is embeded in SDataRow
...@@ -958,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -958,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
// only return the qualified data to client in terms of query time window, data rows in the same block but do not // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded // be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
...@@ -972,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -972,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos; int32_t endPos = cur->pos;
...@@ -1033,7 +1046,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1033,7 +1046,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL; SSkipListNode* node = NULL;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
if (row == NULL) { if (row == NULL) {
break; break;
} }
...@@ -1051,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1051,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -1061,9 +1074,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1061,9 +1074,9 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->lastKey = key + step; cur->lastKey = key + step;
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRow(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
moveToNextRow(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
...@@ -1072,7 +1085,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1072,7 +1085,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
moveToNextRow(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} }
int32_t start = -1; int32_t start = -1;
...@@ -1376,7 +1389,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1376,7 +1389,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
* } * }
*/ */
tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt); tsdbDebug("%p %d data blocks sort completed, %p", pQueryHandle, cnt, pQueryHandle->qinfo);
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
free(pTree); free(pTree);
...@@ -1391,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1391,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
STimeWindow win = TSWINDOW_INITIALIZER;
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
pQueryHandle->pFileGroup = NULL;
break;
}
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break; break;
} }
...@@ -1750,11 +1776,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1750,11 +1776,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->skey = TSKEY_INITIAL_VAL; win->skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
if (row == NULL) { if (row == NULL) {
break; break;
} }
...@@ -1772,14 +1797,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -1772,14 +1797,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
win->ekey = key; win->ekey = key;
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
moveToNextRow(pCheckInfo); moveToNextRowInMem(pCheckInfo);
break; break;
} }
} while(moveToNextRow(pCheckInfo)); } while(moveToNextRowInMem(pCheckInfo));
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
...@@ -1869,7 +1894,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1869,7 +1894,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} }
// todo opt perf
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i); SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst; pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
...@@ -1961,43 +1985,20 @@ static void destroyHelper(void* param) { ...@@ -1961,43 +1985,20 @@ static void destroyHelper(void* param) {
free(param); free(param);
} }
#define TAG_INVALID_COLUMN_INDEX -2
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
// filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
return TSDB_TBNAME_COLUMN_INDEX;
}
for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
STColumn* pColumn = &pTSchema->columns[i];
if (pColumn->bytes == pSchema->bytes && pColumn->type == pSchema->type && pColumn->colId == pSchema->colId) {
return i;
}
}
return -2;
}
void filterPrepare(void* expr, void* param) { void filterPrepare(void* expr, void* param) {
tExprNode* pExpr = (tExprNode*)expr; tExprNode* pExpr = (tExprNode*)expr;
if (pExpr->_node.info != NULL) { if (pExpr->_node.info != NULL) {
return; return;
} }
int32_t i = 0;
pExpr->_node.info = calloc(1, sizeof(tQueryInfo)); pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
STSchema* pTSSchema = (STSchema*) param; STSchema* pTSSchema = (STSchema*) param;
tQueryInfo* pInfo = pExpr->_node.info; tQueryInfo* pInfo = pExpr->_node.info;
tVariant* pCond = pExpr->_node.pRight->pVal; tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema; SSchema* pSchema = pExpr->_node.pLeft->pSchema;
int32_t index = getTagColumnIndex(pTSSchema, pSchema);
assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX);
pInfo->sch = *pSchema; pInfo->sch = *pSchema;
pInfo->colIndex = index;
pInfo->optr = pExpr->_node.optr; pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr); pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
pInfo->param = pTSSchema; pInfo->param = pTSSchema;
...@@ -2143,7 +2144,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { ...@@ -2143,7 +2144,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
char* val = NULL; char* val = NULL;
if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) TABLE_NAME(pTable); val = (char*) TABLE_NAME(pTable);
} else { } else {
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId); val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
......
...@@ -35,12 +35,12 @@ extern "C" { ...@@ -35,12 +35,12 @@ extern "C" {
#define WCHAR wchar_t #define WCHAR wchar_t
#define tfree(x) \ #define tfree(x) \
{ \ do { \
if (x) { \ if (x) { \
free((void *)(x)); \ free((void *)(x)); \
x = 0; \ x = 0; \
} \ } \
} } while(0);
#define tstrncpy(dst, src, size) \ #define tstrncpy(dst, src, size) \
do { \ do { \
......
...@@ -522,7 +522,7 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -522,7 +522,7 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-"; const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX] = {0}; char tmpPath[PATH_MAX];
#ifdef WINDOWS #ifdef WINDOWS
char *tmpDir = getenv("tmp"); char *tmpDir = getenv("tmp");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册