未验证 提交 38598c5e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2784 from taosdata/feature/query

Feature/query
......@@ -31,8 +31,8 @@ extern "C" {
#include "tutil.h"
#include "qExecutor.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qsqlparser.h"
#include "tcmdtype.h"
// forward declaration
......
......@@ -430,7 +430,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
pRes->code = code;
if (code != TSDB_CODE_SUCCESS) {
tscError("%p ge tableMeta failed, code:%s", pSql, tstrerror(code));
tscError("%p get tableMeta failed, code:%s", pSql, tstrerror(code));
goto _error;
} else {
tscDebug("%p get tableMeta successfully", pSql);
......
......@@ -2131,6 +2131,11 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
}
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo == NULL) {
return true;
}
STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx);
// required number of results are not reached, continue load data block
......
......@@ -691,9 +691,15 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, size, capacity);
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
while((pg - overhead) < pModel->rowSize * 2) {
pg *= 2;
}
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......
......@@ -1505,12 +1505,11 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql);
// todo add to async res or not??
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex);
tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pParentSql->res.code = terrno;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return pParentSql->res.code;
......
......@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
}
tscDebug("client is initialized successfully");
......
......@@ -356,9 +356,9 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
if (pObj->signature == pObj) {
pthread_mutex_lock(&pObj->mutex);
//pthread_mutex_lock(&pObj->mutex);
tfree(pSql->sqlstr);
pthread_mutex_unlock(&pObj->mutex);
//pthread_mutex_unlock(&pObj->mutex);
}
tscFreeSqlResult(pSql);
......@@ -1675,6 +1675,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
tscError("%p new subquery failed, tableIndex:%d", pSql, tableIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
......@@ -1688,6 +1689,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
free(pNew);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
......@@ -1706,6 +1708,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
......@@ -1743,6 +1746,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
tscFreeSqlObj(pNew);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
......@@ -1827,8 +1831,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
if (pFinalInfo->pTableMeta == NULL) {
tscError("%p new subquery failed for get tableMeta is NULL from cache", pSql);
tscError("%p new subquery failed since no tableMeta in cache, name:%s", pSql, name);
tscFreeSqlObj(pNew);
if (pPrevSql != NULL) {
assert(pPrevSql->res.code != TSDB_CODE_SUCCESS);
terrno = pPrevSql->res.code;
} else {
terrno = TSDB_CODE_TSC_APP_ERROR;
}
return NULL;
}
......
......@@ -49,7 +49,7 @@ static taos_qset readQset;
int32_t dnodeInitVnodeRead() {
readQset = taosOpenQset();
readPool.min = 2;
readPool.min = tsNumOfCores;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
......@@ -206,11 +206,15 @@ static void *dnodeProcessReadQueue(void *param) {
taosMsg[pReadMsg->rpcMsg.msgType], type);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC) {
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS);
} else {
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
}
taosFreeQitem(pReadMsg);
}
......
......@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/**
......@@ -38,7 +38,10 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs
* @param qinfo
* @return
*/
void qTableQuery(qinfo_t qinfo);
bool qTableQuery(qinfo_t qinfo);
void* pGetRspMsg(qinfo_t qinfo);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......@@ -48,7 +51,7 @@ void qTableQuery(qinfo_t qinfo);
* @param qinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext);
/**
*
......@@ -60,16 +63,9 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo,
* so it can be only called once for each retrieve
*
* @param qinfo
* @return
*/
bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
void* qGetResultRetrieveMsg(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
......
......@@ -216,6 +216,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_OUT_OF_MEMORY, 0, 0x0703, "query out
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_APP_ERROR, 0, 0x0704, "query app error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "query duplicated join key")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "query tag conditon too many")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "query not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "query should response")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "grant expired")
......
......@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, true, mnodeFreeConn, "conn");
return 0;
}
......@@ -119,7 +119,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
return NULL;
}
if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */) {
if (/* pConn->ip != ip || */ pConn->port != port /* || strcmp(pConn->user, user) != 0 */) {
mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user,
taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
......
......@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
......
......@@ -20,8 +20,8 @@
#include "hash.h"
#include "qFill.h"
#include "qResultbuf.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qsqlparser.h"
#include "query.h"
#include "taosdef.h"
#include "tarray.h"
......@@ -43,7 +43,7 @@ typedef struct SSqlGroupbyExpr {
typedef struct SPosInfo {
int32_t pageId;
int16_t rowId;
int32_t rowId;
} SPosInfo;
typedef struct SWindowStatus {
......@@ -177,13 +177,18 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
};
typedef struct SQInfo {
void* signature;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;
// sem_t dataReady;
void* tsdb;
void* param;
int32_t vgId;
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
......@@ -202,6 +207,9 @@ typedef struct SQInfo {
int32_t numOfGroupResultPages;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
int32_t dataReady; // denote if query result is ready or not
void* rspContext; // response context
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -19,7 +19,6 @@
extern "C" {
#endif
#include "os.h"
#include "taosmsg.h"
......@@ -28,9 +27,9 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*4) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
#define DEFAULT_PAGE_SIZE (4096L) // 16k larger than the SHistoInfo
typedef enum EXT_BUFFER_FLUSH_MODEL {
/*
......@@ -126,7 +125,7 @@ typedef struct tExtMemBuffer {
* @param pModel
* @return
*/
tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel);
tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel);
/**
*
......
......@@ -13,50 +13,85 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEQUERYUTIL_H
#define TDENGINE_VNODEQUERYUTIL_H
#ifndef TDENGINE_QRESULTBUF_H
#define TDENGINE_QRESULTBUF_H
#ifdef __cplusplus
extern "C" {
#endif
#include <tlist.h>
#include "hash.h"
#include "os.h"
#include "qExtbuffer.h"
#include "tlockfree.h"
typedef struct SArray* SIDList;
typedef struct SPageDiskInfo {
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
int32_t pageId;
SPageDiskInfo info;
void* pData;
bool used; // set current page is in used
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SResultBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd; // data file fd
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
void* pBuf; // mmap buffer pointer
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list
void* iBuf; // inmemory buf
void* handle; // for debug purpose
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
const void* handle; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
#define DEFAULT_INTERN_BUF_PAGE_SIZE (4096L)
#define DEFAULT_INMEM_BUF_PAGES 10
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/**
* create disk-based result buffer
* @param pResultBuf
* @param size
* @param rowSize
* @param pagesize
* @param inMemPages
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, void* handle);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle);
/**
*
......@@ -72,7 +107,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
* @param pResultBuf
* @return
*/
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf);
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf);
/**
*
......@@ -88,42 +123,52 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id
* @return
*/
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
if (id < pResultBuf->inMemPages) {
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
} else {
return (tFilePage*) ((char*) pResultBuf->pBuf + (id - pResultBuf->inMemPages) * pResultBuf->pageSize);
}
}
tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
/**
* release the referenced buf pages
* @param pResultBuf
* @param page
*/
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
/**
*
* @param pResultBuf
* @param pi
*/
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @return
*/
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf);
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @return
*/
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf);
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf);
/**
* destroy result buffer
* @param pResultBuf
*/
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle);
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
/**
*
* @param pList
* @return
*/
int32_t getLastPageId(SIDList pList);
SPageInfo* getLastPageInfo(SIDList pList);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEQUERYUTIL_H
#endif // TDENGINE_QRESULTBUF_H
......@@ -45,13 +45,14 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) {
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult,
tFilePage* page) {
assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
// tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
}
......
......@@ -18,8 +18,8 @@
#include "exception.h"
#include "qAst.h"
#include "qSqlparser.h"
#include "qSyntaxtreefunction.h"
#include "qsqlparser.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
......
此差异已折叠。
......@@ -28,10 +28,10 @@
/*
* SColumnModel is deeply copy
*/
tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel) {
tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t pagesize, SColumnModel *pModel) {
tExtMemBuffer* pMemBuffer = (tExtMemBuffer *)calloc(1, sizeof(tExtMemBuffer));
pMemBuffer->pageSize = DEFAULT_PAGE_SIZE;
pMemBuffer->pageSize = pagesize;
pMemBuffer->inMemCapacity = ALIGN8(inMemSize) / pMemBuffer->pageSize;
pMemBuffer->nElemSize = elemSize;
......
......@@ -14,7 +14,7 @@
*/
#include "os.h"
#include "qsqlparser.h"
#include "qSqlparser.h"
#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
......
......@@ -535,7 +535,7 @@ void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) {
if (pSeg->pBuffer[slotIdx] == NULL) {
pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize,
pBucket->pOrderDesc->pColumnModel);
pBucket->pageSize, pBucket->pOrderDesc->pColumnModel);
pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL;
pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage;
}
......
#include "qResultbuf.h"
#include "stddef.h"
#include "tscompression.h"
#include "hash.h"
#include "qExtbuffer.h"
#include "queryLog.h"
#include "taoserror.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
int32_t pagesize, int32_t inMemPages, void* handle) {
#define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
if (pResBuf == NULL) {
return TSDB_CODE_COM_OUT_OF_MEMORY;
}
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = inMemPages; // all pages are in buffer in the first place
pResBuf->inMemPages = inMemPages;
assert(inMemPages <= numOfPages);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->totalBufSize = pResBuf->numOfPages * pagesize;
pResBuf->incStep = 4;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1;
pResBuf->comp = true;
pResBuf->file = NULL;
pResBuf->handle = handle;
pResBuf->fileSize = 0;
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->numOfRowsPerPage = (pagesize - sizeof(tFilePage)) / rowSize;
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->idsTable = taosHashInit(numOfPages, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
char path[PATH_MAX] = {0};
getTmpfilePath("tsdb_qbuf", path);
getTmpfilePath("qbuf", path);
pResBuf->path = strdup(path);
pResBuf->fd = FD_INITIALIZER;
pResBuf->pBuf = NULL;
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
qDebug("QInfo:%p create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", handle, pResBuf->pageSize,
pResBuf->inMemPages, pResBuf->path);
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR | O_TRUNC, 0666);
if (!FD_VALID(pResultBuf->fd)) {
static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->file = fopen(pResultBuf->path, "wb+");
if (pResultBuf->file == NULL) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
return TSDB_CODE_SUCCESS;
}
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing
if (!pResultBuf->comp) {
*dst = srcSize;
return data;
}
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
*dst = tsCompressString(data, srcSize, 1, pResultBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
memcpy(data, pResultBuf->assistBuf, *dst);
return data;
}
static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing
if (!pResultBuf->comp) {
*dst = srcSize;
return data;
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
return TSDB_CODE_SUCCESS;
*dst = tsDecompressString(data, srcSize, 1, pResultBuf->assistBuf, pResultBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
memcpy(data, pResultBuf->assistBuf, *dst);
return data;
}
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNumOfPages) {
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize);
int32_t ret = TSDB_CODE_SUCCESS;
static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t size) {
if (pResultBuf->pFree == NULL) {
return pResultBuf->nextPos;
} else {
int32_t offset = -1;
if (pResultBuf->pBuf == NULL) {
assert(pResultBuf->fd == FD_INITIALIZER);
size_t num = taosArrayGetSize(pResultBuf->pFree);
for(int32_t i = 0; i < num; ++i) {
SFreeListItem* pi = taosArrayGet(pResultBuf->pFree, i);
if (pi->len >= size) {
offset = pi->offset;
pi->offset += size;
pi->len -= size;
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret;
return offset;
}
}
// no available recycle space, allocate new area in file
return pResultBuf->nextPos;
}
}
static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
assert(!pg->used && pg->pData != NULL);
int32_t size = -1;
char* t = doCompressData(GET_DATA_PAYLOAD(pg), pResultBuf->pageSize, &size, pResultBuf);
// this page is flushed to disk for the first time
if (pg->info.offset == -1) {
pg->info.offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size;
fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
/*int32_t ret =*/ fwrite(t, 1, size, pResultBuf->file);
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) {
pResultBuf->fileSize = pg->info.offset + pg->info.length;
}
} else {
ret = munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->numOfPages += incNumOfPages;
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if (pg->info.length < size) {
// 1. add current space to free list
taosArrayPush(pResultBuf->pFree, &pg->info);
// 2. allocate new position, and update the info
pg->info.offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size;
}
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return TSDB_CODE_QRY_NO_DISKSPACE;
//3. write to disk.
fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
fwrite(t, size, 1, pResultBuf->file);
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) {
pResultBuf->fileSize = pg->info.offset + pg->info.length;
}
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
char* ret = pg->pData;
memset(ret, 0, pResultBuf->pageSize);
pg->pData = NULL;
pg->info.length = size;
pResultBuf->statis.flushBytes += pg->info.length;
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
return ret;
}
static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS;
assert(pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
if (pResultBuf->file == NULL) {
if ((ret = createDiskFile(pResultBuf)) != TSDB_CODE_SUCCESS) {
terrno = ret;
return NULL;
}
}
return TSDB_CODE_SUCCESS;
return doFlushPageToDisk(pResultBuf, pg);
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->allocateId == (_b)->numOfPages - 1)
// load file block data in disk
static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
ret = fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file);
if (ret != pg->info.length) {
terrno = errno;
return NULL;
}
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
pResultBuf->statis.loadBytes += pg->info.length;
int32_t fullSize = 0;
doDecompressData(GET_DATA_PAYLOAD(pg), pg->info.length, &fullSize, pResultBuf);
return GET_DATA_PAYLOAD(pg);
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, POINTER_BYTES);
int32_t ret = taosHashPut(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
assert(ret == 0);
return pa;
}
static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL;
char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return -1;
list = addNewGroup(pResultBuf, groupId);
} else {
list = (SIDList) (*p);
}
int32_t slot = GET_INT32_VAL(p);
assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
pResultBuf->numOfPages += 1;
SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};
ppi->info = PAGE_INFO_INITIALIZER;
ppi->pageId = pageId;
ppi->pData = NULL;
ppi->pn = NULL;
ppi->used = true;
return slot;
return *(SPageInfo**) taosArrayPush(list, &ppi);
}
static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) {
SListIter iter = {0};
tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD);
SArray* pa = taosArrayInit(1, sizeof(int32_t));
taosArrayPush(pResultBuf->list, &pa);
SListNode* pn = NULL;
while((pn = tdListNext(&iter)) != NULL) {
assert(pn != NULL);
assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
return num;
SPageInfo* pageInfo = *(SPageInfo**) pn->data;
assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);
if (!pageInfo->used) {
break;
}
}
return pn;
}
static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
slot = addNewGroupId(pResultBuf, groupId);
static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) {
char* bufPage = NULL;
SListNode* pn = getEldestUnrefedPage(pResultBuf);
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
int32_t prev = pResultBuf->inMemPages;
pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5;
qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize);
} else {
pResultBuf->statis.flushPages += 1;
tdListPopNode(pResultBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**) pn->data;
assert(d->pn == pn);
d->pn = NULL;
tfree(pn);
bufPage = flushPageToDisk(pResultBuf, d);
}
SIDList pList = taosArrayGetP(pResultBuf->list, slot);
taosArrayPush(pList, &pageId);
return bufPage;
}
static void lruListPushFront(SList *pList, SPageInfo* pi) {
tdListPrepend(pList, &pi);
SListNode* front = tdListGetHead(pList);
pi->pn = front;
}
static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
tdListPopNode(pList, pi->pn);
tdListPrependNode(pList, pi->pn);
}
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
pResultBuf->statis.getPages += 1;
char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL;
}
availablePage = evicOneDataPage(pResultBuf);
}
// register new id in this group
*pageId = (++pResultBuf->allocateId);
registerPageId(pResultBuf, groupId, *pageId);
// clear memory for the new page
tFilePage* page = getResBufPage(pResultBuf, *pageId);
memset(page, 0, pResultBuf->pageSize);
// register page id info
SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId);
// add to LRU list
assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages && pResultBuf->inMemPages > 0);
lruListPushFront(pResultBuf->lruList, pi);
// add to hash map
taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
// allocate buf
if (availablePage == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
} else {
pi->pData = availablePage;
}
pResultBuf->totalBufSize += pResultBuf->pageSize;
return page;
((void**)pi->pData)[0] = pi;
pi->used = true;
return GET_DATA_PAYLOAD(pi);
}
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(pResultBuf != NULL && id >= 0);
pResultBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pResultBuf->all, &id, sizeof(int32_t));
assert(pi != NULL && *pi != NULL);
if ((*pi)->pData != NULL) { // it is in memory
// no need to update the LRU list if only one page exists
if (pResultBuf->numOfPages == 1) {
(*pi)->used = true;
return GET_DATA_PAYLOAD(*pi);
}
SPageInfo** pInfo = (SPageInfo**) ((*pi)->pn->data);
assert(*pInfo == *pi);
lruListMoveToFront(pResultBuf->lruList, (*pi));
(*pi)->used = true;
return GET_DATA_PAYLOAD(*pi);
} else { // not in memory
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0);
char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf);
}
if (availablePage == NULL) {
(*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
} else {
(*pi)->pData = availablePage;
}
((void**)((*pi)->pData))[0] = (*pi);
lruListPushFront(pResultBuf->lruList, *pi);
loadPageFromDisk(pResultBuf, *pi);
return GET_DATA_PAYLOAD(*pi);
}
}
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) {
assert(pResultBuf != NULL && page != NULL);
char* p = (char*) page - POINTER_BYTES;
SPageInfo* ppi = ((SPageInfo**) p)[0];
releaseResBufPageInfo(pResultBuf, ppi);
}
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) {
assert(pi->pData != NULL && pi->used);
pi->used = false;
pResultBuf->statis.releasePages += 1;
}
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
assert(pResultBuf != NULL);
char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return pResultBuf->emptyDummyIdList;
} else {
return taosArrayGetP(pResultBuf->list, slot);
return (SArray*) (*p);
}
}
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
if (pResultBuf == NULL) {
return;
}
if (FD_VALID(pResultBuf->fd)) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, pResultBuf->fileSize);
close(pResultBuf->fd);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->pBuf = NULL;
fclose(pResultBuf->file);
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", handle,
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
pResultBuf->totalBufSize);
}
unlink(pResultBuf->path);
tfree(pResultBuf->path);
size_t size = taosArrayGetSize(pResultBuf->list);
for (int32_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pResultBuf->list, i);
taosArrayDestroy(pa);
SHashMutableIterator* iter = taosHashCreateIter(pResultBuf->groupSet);
while(taosHashIterNext(iter)) {
SArray** p = (SArray**) taosHashIterGet(iter);
size_t n = taosArrayGetSize(*p);
for(int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(*p, i);
tfree(pi->pData);
tfree(pi);
}
taosArrayDestroy(pResultBuf->list);
taosArrayDestroy(*p);
}
taosHashDestroyIter(iter);
tdListFree(pResultBuf->lruList);
taosArrayDestroy(pResultBuf->emptyDummyIdList);
taosHashCleanup(pResultBuf->idsTable);
taosHashCleanup(pResultBuf->groupSet);
taosHashCleanup(pResultBuf->all);
tfree(pResultBuf->iBuf);
tfree(pResultBuf->assistBuf);
tfree(pResultBuf);
}
int32_t getLastPageId(SIDList pList) {
SPageInfo* getLastPageInfo(SIDList pList) {
size_t size = taosArrayGetSize(pList);
return *(int32_t*) taosArrayGet(pList, size - 1);
return (SPageInfo*) taosArrayGetP(pList, size - 1);
}
......@@ -237,10 +237,12 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
return;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pos.pageId);
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size);
......@@ -277,8 +279,11 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
// copy the output buffer data from src to dst, the position info keep unchanged
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src);
tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pos.pageId);
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage);
tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pos.pageId);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src, srcpage);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memcpy(dstBuf, srcBuf, s);
......
......@@ -30,7 +30,7 @@
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "qsqlparser.h"
#include "qSqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h"
#include "ttokendef.h"
......
......@@ -18,17 +18,144 @@ void simpleTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getNumOfRowsPerPage(pResultBuf), (16384L - sizeof(int64_t))/64);
ASSERT_EQ(getResBufSize(pResultBuf), 1000*16384L);
ASSERT_EQ(getResBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
destroyResultBuf(pResultBuf, NULL);
releaseResBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf);
}
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseResBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 5);
destroyResultBuf(pResultBuf);
}
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1000, 64, 1024, 4, NULL);
int32_t pageId = 0;
int32_t writePageId = 0;
int32_t groupId = 0;
int32_t nx = 12345;
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
releaseResBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseResBufPage(pResultBuf, t4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t5 = getResBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
*(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data
releaseResBufPage(pResultBuf, pBufPagex);
tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
destroyResultBuf(pResultBuf);
}
} // namespace
TEST(testCase, resultBufferTest) {
srand(time(NULL));
simpleTest();
writeDownTest();
recyclePageTest();
}
......@@ -644,7 +644,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.uid = (_checkInfo)->tableId.uid})
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
STsdbRepo *pRepo = pQueryHandle->pTsdb;
bool blockLoaded = false;
int64_t st = taosGetTimestampUs();
......@@ -678,8 +678,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime);
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return blockLoaded;
}
......@@ -702,8 +703,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
pQueryHandle->outputCapacity, &cur->win, pQueryHandle);
cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step, pQueryHandle->outputCapacity, &cur->win, pQueryHandle);
pQueryHandle->realNumOfRows = cur->rows;
// update the last key value
......@@ -717,7 +717,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
return;
}
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
/*
......@@ -742,9 +742,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
SQueryFilePos* cur = &pQueryHandle->cur;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in current block
// query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
}
......@@ -758,13 +758,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0;
}
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
}
......@@ -775,6 +776,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfRows - 1;
}
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
......@@ -854,6 +856,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1;
assert(num >= 0);
if (num == 0) {
return numOfRows;
}
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block
......@@ -995,6 +1003,76 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
}
}
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) {
return;
}
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
}
static void getQualifiedRowsPos(STsdbQueryHandle* pQueryHandle, int32_t startPos, int32_t endPos,
int32_t numOfExisted, int32_t *start, int32_t *end) {
*start = -1;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - startPos + 1;
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
*end = (pQueryHandle->outputCapacity - numOfExisted) + startPos - 1;
} else {
*end = endPos;
}
*start = startPos;
} else {
int32_t remain = (startPos - endPos) + 1;
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
*end = startPos + 1 - (pQueryHandle->outputCapacity - numOfExisted);
} else {
*end = endPos;
}
*start = *end;
*end = startPos;
}
}
static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
SQueryFilePos* cur = &pQueryHandle->cur;
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
cur->pos = endPos;
}
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
SQueryFilePos* cur = &pQueryHandle->cur;
if (cur->rows > 0) {
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
} else {
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
}
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, 0);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
} else {
cur->win = pQueryHandle->window;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
cur->lastKey = pQueryHandle->window.ekey + step;
}
}
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
......@@ -1002,17 +1080,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos;
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1;
cur->mixBlock = (cur->pos != 0);
......@@ -1026,46 +1109,34 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
}
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = cur->pos;
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
TSKEY* tsArray = pCols->cols[0].pData;
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
// no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos;
int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
end = cur->pos;
start = endPos;
SWAP(start, end, int32_t);
}
cur->win.skey = tsArray[start];
cur->win.ekey = tsArray[end];
// todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
// the time window should always be right order: skey <= ekey
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
cur->lastKey = tsArray[endPos];
pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pQueryHandle);
return;
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
......@@ -1112,26 +1183,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
moveToNextRowInMem(pCheckInfo);
}
int32_t start = -1;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = end - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
start = pos;
} else {
int32_t remain = (pos - end) + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
start = end;
end = pos;
}
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
......@@ -1148,54 +1207,29 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->win.skey = tsArray[pos];
}
int32_t start = -1;
int32_t end = -1;
// all remain data are qualified, but check the remain capacity in the first place.
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
start = pos;
end = endPos;
} else {
int32_t remain = pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
start = endPos;
end = pos;
}
int32_t start = -1, end = -1;
getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->lastKey = cur->win.ekey + step;
}
}
}
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
cur->blockCompleted =
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
}
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
cur->pos = pos;
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pQueryHandle);
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
......@@ -1438,8 +1472,10 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle,
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo);
pQueryHandle->pFileGroup = NULL;
assert(pQueryHandle->numOfBlocks == 0);
break;
}
......@@ -1447,8 +1483,8 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
break;
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables,
pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0);
if (numOfBlocks == 0) {
......@@ -1456,7 +1492,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
}
// todo return error code to query engine
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
if ((code = createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -1467,7 +1503,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
}
// no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) {
if (pQueryHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) {
assert(pQueryHandle->pFileGroup == NULL);
}
......@@ -1477,6 +1513,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
return code;
}
assert(pQueryHandle->pFileGroup != NULL && pQueryHandle->numOfBlocks > 0);
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId;
......@@ -1842,8 +1879,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %p", pQueryHandle,
elapsedTime, numOfRows, numOfCols, pQueryHandle->qinfo);
return numOfRows;
}
......@@ -1958,7 +1995,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
return pHandle->pColumns;
} else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot);
// todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
......@@ -2038,7 +2075,6 @@ typedef struct STableGroupSupporter {
int32_t numOfCols;
SColIndex* pCols;
STSchema* pTagSchema;
// void* tsdbMeta;
} STableGroupSupporter;
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
......@@ -2428,8 +2464,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tsdbDestroyHelper(&pQueryHandle->rhelper);
SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug(":io-cost summary: statis-info:%"PRId64"us, datablock:%" PRId64"us, check data:%"PRId64"us, %p",
pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64"us, datablock:%" PRId64"us, check data:%"PRId64"us, %p",
pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);
tfree(pQueryHandle);
}
......
......@@ -68,8 +68,6 @@ typedef struct {
int64_t refreshTime;
STrashElem * pTrash;
char* name;
// void * tmrCtrl;
// void * pTimer;
SCacheStatis statistics;
SHashObj * pHashTable;
__cache_free_fn_t freeFp;
......
......@@ -55,6 +55,8 @@ int tdListPrepend(SList *list, void *data);
int tdListAppend(SList *list, void *data);
SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list);
SListNode *tdListGetHead(SList *list);
SListNode *tsListGetTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst);
void tdListDiscard(SList *list);
......
......@@ -343,7 +343,7 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
(*ptNode)->expireTime = taosGetTimestampMs() + (*ptNode)->lifespan;
(*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
}
__cache_unlock(pCacheObj);
......@@ -381,7 +381,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL) return NULL;
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
......@@ -419,7 +419,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// note: extend lifespan before dec ref count
bool inTrashCan = pNode->inTrashCan;
if (pCacheObj->extendLifespan && (!inTrashCan)) {
if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
}
......@@ -457,8 +457,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} else {
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, ref,
inTrashCan);
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data,
ref, inTrashCan);
}
}
......@@ -572,6 +573,7 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
free(pElem);
}
// TODO add another lock when scanning trashcan
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
__cache_wr_lock(pCacheObj);
......@@ -643,6 +645,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
__cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
continue;
......@@ -674,6 +677,7 @@ void* taosCacheTimedRefresh(void *handle) {
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
uDebug("%s refresh threads quit", pCacheObj->name);
break;
}
......
#include "taosdef.h"
#include "tcompare.h"
#include <tarray.h>
#include "tarray.h"
#include "tutil.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
......
......@@ -76,6 +76,7 @@ int tdListPrepend(SList *list, void *data) {
SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1;
node->next = node->prev = NULL;
memcpy((void *)(node->data), data, list->eleSize);
tdListPrependNode(list, node);
......@@ -121,6 +122,22 @@ SListNode *tdListPopTail(SList *list) {
return node;
}
SListNode *tdListGetHead(SList *list) {
if (list == NULL || list->numOfEles == 0) {
return NULL;
}
return list->head;
}
SListNode *tsListGetTail(SList *list) {
if (list == NULL || list->numOfEles == 0) {
return NULL;
}
return list->tail;
}
SListNode *tdListPopNode(SList *list, SListNode *node) {
if (list->head == node) {
list->head = node->next;
......
......@@ -71,11 +71,45 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.handle = NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
bool continueExec = false;
int32_t code = TSDB_CODE_SUCCESS;
if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vDebug("QInfo:%p add to query task queue for exec", handle);
vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
*freeHandle = false;
} else {
vDebug("QInfo:%p exec completed", handle);
*freeHandle = true;
}
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
*freeHandle = true;
}
return code;
}
static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->completed = true;
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
......@@ -98,6 +132,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else {
assert(*qhandle == (void*) killQueryMsg->qhandle);
qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
}
......@@ -110,7 +145,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (contLen != 0) {
qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, &pQInfo);
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
......@@ -133,7 +168,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->qhandle = htobe64((uint64_t) pQInfo);
}
pQInfo = NULL;
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
......@@ -153,16 +187,34 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else {
assert(pCont != NULL);
handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
if (handle == NULL) {
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_QRY_INVALID_QHANDLE;
} else {
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
qTableQuery(*handle); // do execute query
bool freehandle = false;
bool buildRes = qTableQuery(*handle); // do execute query
// build query rsp
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
assert(pReadMsg->rpcMsg.handle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
}
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle);
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
}
return code;
......@@ -176,7 +228,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, (void*) pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
memset(pRet, 0, sizeof(SRspRet));
......@@ -186,15 +238,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->useconds = 0;
pRsp->completed = true;
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
......@@ -203,35 +247,25 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
bool freeHandle = true;
code = qRetrieveQueryResultInfo(*handle);
bool buildRes = false;
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
if (code != TSDB_CODE_SUCCESS) {
//TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { // if failed to dump result, free qhandle immediately
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
if (qHasMoreResultsToRetrieve(*handle)) {
vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle;
freeHandle = false;
} else {
qKillQuery(*handle);
freeHandle = true;
}
} else { // result is not ready, return immediately
if (!buildRes) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
return TSDB_CODE_QRY_NOT_READY;
}
code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
......
......@@ -93,6 +93,8 @@ run general/parser/groupby.sim
sleep 2000
run general/parser/tags_filter.sim
sleep 2000
run general/parser/topbot.sim
sleep 2000
run general/parser/union.sim
sleep 2000
run general/parser/sliding.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
$dbPrefix = tb_db
$tbPrefix = tb_tb
$stbPrefix = tb_stb
$tbNum = 10
$rowNum = 1000
$totalNum = $tbNum * $rowNum
$loops = 200000
$log = 10000
$ts0 = 1537146000000
$delta = 600000
print ========== topbot.sim
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16 maxtables 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $tbId
sql create table $tb using $stb tags( $i )
sql create table $tb1 using $stb tags( $tbId )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
sql use $db
##### select from table
print ====== select top/bot from table and check num of rows returned
sql select top(c1, 100) from tb_stb0
if $row != 100 then
return -1
endi
sql select last(c2) from tb_tb9
if $row != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册