提交 26e4efe5 编写于 作者: H hjxilinx

refactor code for query intermeidate buffer.

上级 1de610a4
......@@ -90,14 +90,14 @@ typedef struct SSubqueryState {
} SSubqueryState;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel * pFinalColModel; // colModel for final result
SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
pthread_mutex_t queryMutex;
} SRetrieveSupport;
......
......@@ -387,18 +387,19 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType) {
if (pPage->numOfElems + numOfRows <= pDesc->pColumnModel->capacity) {
tColModelAppend(pDesc->pColumnModel, pPage, data, 0, numOfRows, numOfRows);
SColumnModel *pModel = pDesc->pColumnModel;
if (pPage->numOfElems + numOfRows <= pModel->capacity) {
tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows);
return 0;
}
SColumnModel *pModel = pDesc->pColumnModel;
int32_t numOfRemainEntries = pDesc->pColumnModel->capacity - pPage->numOfElems;
// current buffer is overflow, flush data to extensive buffer
int32_t numOfRemainEntries = pModel->capacity - pPage->numOfElems;
tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows);
/* current buffer is full, need to flushed to disk */
assert(pPage->numOfElems == pDesc->pColumnModel->capacity);
// current buffer is full, need to flushed to disk
assert(pPage->numOfElems == pModel->capacity);
int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
if (ret != 0) {
return -1;
......
......@@ -1202,6 +1202,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) {
......
......@@ -676,7 +676,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
STableDataBlocks** dataBlocks) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHash(pHashList, (const char*)&id, sizeof(id));
STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHashTable(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
*dataBlocks = *t1;
}
......
......@@ -43,10 +43,10 @@ typedef struct SHashEntry {
typedef struct HashObj {
SHashEntry **hashList;
uint32_t capacity;
int size;
_hash_fn_t hashFp;
bool multithreadSafe; // enable lock
uint32_t capacity; // number of slots
int size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
bool multithreadSafe; // enable lock or not
#if defined LINUX
pthread_rwlock_t lock;
......@@ -57,11 +57,13 @@ typedef struct HashObj {
} HashObj;
void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe);
void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen);
int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size);
void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen);
int32_t taosNumElemsInHashTable(HashObj *pObj);
char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen);
char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen);
void taosCleanUpHashTable(void *handle);
......
#ifndef TDENGINE_VNODEQUERYUTIL_H
#define TDENGINE_VNODEQUERYUTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "textbuffer.h"
typedef struct SIDList {
uint32_t alloc;
int32_t size;
int32_t* pData;
} SIDList;
typedef struct SQueryResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd; // data file fd
int32_t ifd; // index file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
char* pBuf; // mmap buffer pointer
char* path; // file path
char* ipath; // index file path
int32_t* pIndexData; // index file data
char* internBuf; // intern buf
int32_t internfd; // intern fd
char* internpath;
uint32_t numOfAllocGroupIds; // number of allocated id list
void* idsTable; // id hash table
SIDList* list; // for each id, there is a page id list
} SQueryResultBuf;
/**
* create disk-based result buffer
* @param pResultBuf
* @param size
* @param rowSize
* @return
*/
int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize);
/**
*
* @param pResultBuf
* @param groupId
* @param pageId
* @return
*/
tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf);
/**
*
* @param pResultBuf
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId);
/**
* get the specified buffer page by id
* @param pResultBuf
* @param id
* @return
*/
tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @return
*/
int32_t getResBufSize(SQueryResultBuf* pResultBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @return
*/
int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf);
/**
* destroy result buffer
* @param pResultBuf
*/
void destroyResultBuf(SQueryResultBuf* pResultBuf);
/**
*
* @param pList
* @return
*/
int32_t getLastPageId(SIDList *pList);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEQUERYUTIL_H
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEQUERYUTIL_H
#define TDENGINE_VNODEQUERYUTIL_H
#ifndef TDENGINE_VNODEQUERYIMPL_H
#define TDENGINE_VNODEQUERYIMPL_H
#ifdef __cplusplus
extern "C" {
......@@ -120,7 +120,7 @@ typedef enum {
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
return *(SMeterObj**)taosGetDataFromHash(hashHandle, (const char*) &sid, sizeof(sid));
return *(SMeterObj**)taosGetDataFromHashTable(hashHandle, (const char*) &sid, sizeof(sid));
}
bool isQueryKilled(SQuery* pQuery);
......@@ -209,7 +209,7 @@ int32_t vnodeGetHeaderFile(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
* @param ekey
* @return
*/
SMeterQueryInfo* createMeterQueryInfo(SQuery* pQuery, TSKEY skey, TSKEY ekey);
SMeterQueryInfo* createMeterQueryInfo(SQuery* pQuery, int32_t sid, TSKEY skey, TSKEY ekey);
/**
* Destroy meter query info
......@@ -224,7 +224,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols);
* @param skey
* @param ekey
*/
void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey);
void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey);
/**
* add the new allocated disk page to meter query info
......@@ -289,4 +289,4 @@ void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo);
}
#endif
#endif // TDENGINE_VNODEQUERYUTIL_H
#endif // TDENGINE_VNODEQUERYIMPL_H
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "tresultBuf.h"
#include "tinterpolation.h"
#include "vnodeTagMgmt.h"
......@@ -182,8 +183,8 @@ typedef struct SMeterQueryInfo {
int64_t skey;
int64_t ekey;
int32_t numOfRes;
uint32_t numOfPages;
uint32_t numOfAlloc;
// uint32_t numOfPages;
// uint32_t numOfAlloc;
int32_t reverseIndex; // reversed output indicator, start from (numOfRes-1)
int16_t reverseFillRes; // denote if reverse fill the results in supplementary scan required or not
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
......@@ -191,7 +192,9 @@ typedef struct SMeterQueryInfo {
int64_t tag;
STSCursor cur;
SResultInfo* resultInfo;
uint32_t* pageList;
// uint32_t* pageList;
// SIDList pageIdList;
int32_t sid; // for retrieve the page id list
} SMeterQueryInfo;
typedef struct SMeterDataInfo {
......@@ -235,15 +238,15 @@ typedef struct SMeterQuerySupportObj {
*/
int32_t meterIdx;
int32_t meterOutputFd;
int32_t lastPageId;
int32_t numOfPages;
// int32_t meterOutputFd;
// int32_t lastPageId;
// int32_t numOfPages;
int32_t numOfGroupResultPages;
int32_t groupResultSize;
char* meterOutputMMapBuf;
int64_t bufSize;
char extBufFile[256]; // external file name
SQueryResultBuf* pResultBuf;
// char* meterOutputMMapBuf;
// int64_t bufSize;
// char extBufFile[256]; // external file name
SMeterDataInfo* pMeterDataInfo;
......
......@@ -56,7 +56,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo
static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn,
bool loadData);
static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery,
SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterHeadDataInfo,
SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo,
int32_t start, int32_t end);
static TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index);
......@@ -1576,7 +1576,7 @@ static SOutputRes *doSetSlidingWindowFromKey(SSlidingWindowInfo *pSlidingWindowI
SWindowStatus **pStatus) {
int32_t p = -1;
int32_t *p1 = (int32_t *)taosGetDataFromHash(pSlidingWindowInfo->hashList, pData, bytes);
int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pSlidingWindowInfo->hashList, pData, bytes);
if (p1 != NULL) {
p = *p1;
......@@ -1706,7 +1706,7 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_
for(int32_t k = 0; k < pSlidingWindowInfo->size; ++k) {
SWindowStatus* pStatus = &pSlidingWindowInfo->pStatus[k];
int32_t *p = (int32_t*) taosGetDataFromHash(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE);
int32_t *p = (int32_t*) taosGetDataFromHashTable(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE);
int32_t v = *p;
v = (v - i);
......@@ -2653,6 +2653,7 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg)
}
bool isQueryKilled(SQuery *pQuery) {
return false;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
/*
......@@ -4439,14 +4440,14 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
}
}
if (FD_VALID(pSupporter->meterOutputFd)) {
assert(pSupporter->meterOutputMMapBuf != NULL);
dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize);
munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
tclose(pSupporter->meterOutputFd);
// if (FD_VALID(pSupporter->meterOutputFd)) {
// assert(pSupporter->meterOutputMMapBuf != NULL);
// dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize);
// munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
// tclose(pSupporter->meterOutputFd);
unlink(pSupporter->extBufFile);
}
// unlink(pSupporter->extBufFile);
// }
tSidSetDestroy(&pSupporter->pSidSet);
......@@ -4542,33 +4543,39 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
if (pQuery->nAggTimeInterval != 0) {
getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
// getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
// pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
if (!FD_VALID(pSupporter->meterOutputFd)) {
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
// if (!FD_VALID(pSupporter->meterOutputFd)) {
// dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
// }
pSupporter->numOfPages = pSupporter->numOfMeters;
// pSupporter->numOfPages = pSupporter->numOfMeters;
ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
// if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
// return TSDB_CODE_SERV_NO_DISKSPACE;
// }
//
// one page for each table at least
ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
if (ret != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
strerror(errno));
return TSDB_CODE_SERV_NO_DISKSPACE;
return ret;
}
pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
pSupporter->lastPageId = -1;
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
// pSupporter->lastPageId = -1;
// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
// pSupporter->meterOutputMMapBuf =
// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
// }
}
// metric query do not invoke interpolation, it will be done at the second-stage merge
......@@ -5261,19 +5268,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
}
}
static tFilePage *getFilePage(SMeterQuerySupportObj *pSupporter, int32_t pageId) {
assert(pageId <= pSupporter->lastPageId && pageId >= 0);
return (tFilePage *)(pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * pageId);
}
static tFilePage *getMeterDataPage(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo, int32_t pageId) {
SMeterQueryInfo *pMeterQueryInfo = pMeterDataInfo->pMeterQInfo;
if (pageId >= pMeterQueryInfo->numOfPages) {
return NULL;
}
int32_t realId = pMeterQueryInfo->pageList[pageId];
return getFilePage(pSupporter, realId);
static tFilePage *getMeterDataPage(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, int32_t index) {
SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
return getResultBufferPageById(pResultBuf, pList.pData[index]);
}
typedef struct Position {
......@@ -5289,7 +5286,9 @@ typedef struct SCompSupporter {
int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) {
Position * pPos = &pSupportor->pPosition[meterIdx];
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter, pSupportor->pMeterDataInfo[meterIdx], pPos->pageIdx);
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->pResultBuf,
pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx);
return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx);
}
......@@ -5297,10 +5296,11 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
SCompSupporter *supportor = (SCompSupporter *)param;
Position leftPos = supportor->pPosition[left];
Position rightPos = supportor->pPosition[right];
SCompSupporter *supporter = (SCompSupporter *)param;
SQueryResultBuf* pResultBuf = supporter->pSupporter->pResultBuf;
Position leftPos = supporter->pPosition[left];
Position rightPos = supporter->pPosition[right];
/* left source is exhausted */
if (leftPos.pageIdx == -1 && leftPos.rowIdx == -1) {
......@@ -5312,10 +5312,10 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param
return -1;
}
tFilePage *pPageLeft = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[left], leftPos.pageIdx);
tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx);
int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx);
tFilePage *pPageRight = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[right], rightPos.pageIdx);
tFilePage *pPageRight = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[right]->pMeterQInfo, rightPos.pageIdx);
int64_t rightTimestamp = *(int64_t *)(pPageRight->data + TSDB_KEYSIZE * rightPos.rowIdx);
if (leftTimestamp == rightTimestamp) {
......@@ -5375,23 +5375,47 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
}
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) +
pSupporter->groupResultSize * pSupporter->offset;
uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems;
assert(numOfElem <= pQuery->pointsToRead);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pQuery->sdata[i], pStart, pRuntimeEnv->pCtx[i].outputBytes * numOfElem + sizeof(tFilePage));
pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset);
// char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) +
// pSupporter->groupResultSize * pSupporter->offset;
// uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems;
// assert(numOfElem <= pQuery->pointsToRead);
int32_t total = 0;
for(int32_t i = 0; i < list.size; ++i) {
tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[i]);
total += pData->numOfElems;
}
pQuery->sdata[0]->len = total;
int32_t offset = 0;
for(int32_t num = 0; num < list.size; ++num) {
tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[num]);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memcpy(pQuery->sdata[i]->data + pRuntimeEnv->offset[i] * total + offset * bytes,
pData->data + pRuntimeEnv->offset[i] * pData->numOfElems,
bytes * pData->numOfElems);
// pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
}
offset += pData->numOfElems;
// pQuery->sdata[0]->len += pData->numOfElems;
}
pQuery->pointsRead += numOfElem;
assert(pQuery->pointsRead == 0);
pQuery->pointsRead += pQuery->sdata[0]->len;
pSupporter->offset += 1;
}
int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SMeterDataInfo *pMeterHeadDataInfo, int32_t start, int32_t end) {
SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) {
// calculate the maximum required space
if (pSupporter->groupResultSize == 0) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
......@@ -5405,8 +5429,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
int32_t numOfMeters = 0;
for (int32_t i = start; i < end; ++i) {
if (pMeterHeadDataInfo[i].pMeterQInfo->numOfPages > 0 && pMeterHeadDataInfo[i].pMeterQInfo->numOfRes > 0) {
pValidMeter[numOfMeters] = &pMeterHeadDataInfo[i];
int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid;
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, sid);
if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->numOfRes > 0) {
pValidMeter[numOfMeters] = &pMeterDataInfo[i];
// set the merge start position: page:0, index:0
posArray[numOfMeters].pageIdx = 0;
posArray[numOfMeters++].rowIdx = 0;
......@@ -5435,10 +5462,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
while (1) {
int32_t pos = pTree->pNode[0].index;
Position * position = &cs.pPosition[pos];
tFilePage *pPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx);
SQueryResultBuf* pResultBuf = cs.pSupporter->pResultBuf;
tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
int64_t ts = getCurrentTimestamp(&cs, pos);
if (ts == lastTimestamp) { // merge with the last one
printf("++++++++++++++++++++++%d, %d, %lld\n", position->pageIdx, pos, ts);
if (ts == lastTimestamp) {// merge with the last one
doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true);
} else {
// copy data to disk buffer
......@@ -5450,7 +5479,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
resetMergeResultBuf(pQuery, pCtx);
}
pPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx);
pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
if (pPage->numOfElems <= 0) { // current source data page is empty
// do nothing
} else {
......@@ -5466,17 +5495,19 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
cs.pPosition[pos].pageIdx += 1; // try next page
// check if current page is empty or not. if it is empty, ignore it and try next
if (cs.pPosition[pos].pageIdx <= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1) {
tFilePage *newPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx);
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
if (cs.pPosition[pos].pageIdx <= list.size - 1) {
tFilePage *newPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
// if current source data page is null, it must be the last page of source output page
if (newPage->numOfElems <= 0) {
// if current source data page is null, it must be the last page of source output page
cs.pPosition[pos].pageIdx += 1;
assert(cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1);
assert(cs.pPosition[pos].pageIdx >= list.size - 1);
}
}
// the following code must be executed if current source pages are exhausted
if (cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages) {
if (cs.pPosition[pos].pageIdx >= list.size) {
cs.pPosition[pos].pageIdx = -1;
cs.pPosition[pos].rowIdx = -1;
......@@ -5494,8 +5525,8 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
if (buffer[0]->numOfElems != 0) { // there are data in buffer
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery),
pSupporter->extBufFile);
// dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery),
// pSupporter->extBufFile);
tfree(pTree);
tfree(pValidMeter);
tfree(posArray);
......@@ -5520,69 +5551,100 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
return pSupporter->numOfGroupResultPages;
}
static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
pSupporter->numOfPages = numOfPages;
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != 0) {
dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
pQInfo->killed = 1;
return pQInfo->code;
}
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1;
return pQInfo->code;
}
return TSDB_CODE_SUCCESS;
}
//static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
// assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
//
// SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
//
// int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
// pSupporter->numOfPages = numOfPages;
//
// /*
// * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
// * be insufficient
// */
// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
// if (ret != 0) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
// pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
// pQInfo->killed = 1;
//
// return pQInfo->code;
// }
//
// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
// pSupporter->meterOutputMMapBuf =
// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
//
// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
// pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
// pQInfo->killed = 1;
//
// return pQInfo->code;
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
printf("group===============%d\n", pSupporter->numOfGroupResultPages);
// int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
// int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
// pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
//
// int32_t requiredPages = pSupporter->numOfPages;
// if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
// while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
// requiredPages += pSupporter->numOfMeters;
// }
//
// if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) {
// return -1;
// }
// }
int32_t requiredPages = pSupporter->numOfPages;
if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
requiredPages += pSupporter->numOfMeters;
}
// char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages +
// pSupporter->groupResultSize * pSupporter->numOfGroupResultPages;
if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) {
return -1;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t base = 200000;
int32_t pageId = -1;
int32_t remain = pQuery->sdata[0]->len;
int32_t offset = 0;
while(remain > 0) {
int32_t r = remain;
if (r > capacity) {
r = capacity;
}
tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->numOfGroupResultPages, &pageId);
//pagewise copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
buf->numOfElems = r;
memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes, buf->numOfElems * bytes);
}
offset += r;
remain -= r;
}
char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages +
pSupporter->groupResultSize * pSupporter->numOfGroupResultPages;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage);
memcpy(lastPosition, pQuery->sdata[i], size);
lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
}
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage);
// memcpy(lastPosition, pQuery->sdata[i], size);
//
// lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
// }
pSupporter->numOfGroupResultPages += 1;
return TSDB_CODE_SUCCESS;
......@@ -6308,7 +6370,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
pOneMeterDataInfo->offsetInHeaderFile = (uint64_t)compHeader->compInfoOffset;
if (pOneMeterDataInfo->pMeterQInfo == NULL) {
pOneMeterDataInfo->pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey);
pOneMeterDataInfo->pMeterQInfo = createMeterQueryInfo(pQuery, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey);
}
(*pReqMeterDataInfo)[*numOfMeters] = pOneMeterDataInfo;
......@@ -6327,18 +6389,18 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
return TSDB_CODE_SUCCESS;
}
SMeterQueryInfo *createMeterQueryInfo(SQuery *pQuery, TSKEY skey, TSKEY ekey) {
SMeterQueryInfo *createMeterQueryInfo(SQuery *pQuery, int32_t sid, TSKEY skey, TSKEY ekey) {
SMeterQueryInfo *pMeterQueryInfo = calloc(1, sizeof(SMeterQueryInfo));
pMeterQueryInfo->skey = skey;
pMeterQueryInfo->ekey = ekey;
pMeterQueryInfo->lastKey = skey;
pMeterQueryInfo->numOfPages = 0;
pMeterQueryInfo->numOfAlloc = INIT_ALLOCATE_DISK_PAGES;
pMeterQueryInfo->pageList = calloc(pMeterQueryInfo->numOfAlloc, sizeof(uint32_t));
// pMeterQueryInfo->numOfPages = 0;
// pMeterQueryInfo->numOfAlloc = INIT_ALLOCATE_DISK_PAGES;
// pMeterQueryInfo->pageList = calloc(pMeterQueryInfo->numOfAlloc, sizeof(uint32_t));
pMeterQueryInfo->lastResRows = 0;
pMeterQueryInfo->sid = sid;
pMeterQueryInfo->cur.vnodeIndex = -1;
pMeterQueryInfo->resultInfo = calloc((size_t)pQuery->numOfOutputCols, sizeof(SResultInfo));
......@@ -6355,7 +6417,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols)
return;
}
free(pMeterQueryInfo->pageList);
// free(pMeterQueryInfo->pageList);
for (int32_t i = 0; i < numOfCols; ++i) {
tfree(pMeterQueryInfo->resultInfo[i].interResultBuf);
}
......@@ -6364,7 +6426,8 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols)
free(pMeterQueryInfo);
}
void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey) {
void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey,
TSKEY ekey) {
if (pMeterQueryInfo == NULL) {
return;
}
......@@ -6378,7 +6441,9 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
pMeterQueryInfo->cur.vnodeIndex = -1;
// previous does not generate any results
if (pMeterQueryInfo->numOfPages == 0) {
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
if (list.size == 0) {
pMeterQueryInfo->reverseFillRes = 0;
} else {
pMeterQueryInfo->reverseIndex = pMeterQueryInfo->numOfRes;
......@@ -6386,34 +6451,34 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
}
}
static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
*pageId = (++pSupporter->lastPageId);
return getFilePage(pSupporter, *pageId);
}
tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo,
SMeterQuerySupportObj *pSupporter) {
uint32_t pageId = 0;
tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
return NULL;
}
if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) {
pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1;
pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc);
}
pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId;
return pPage;
}
//static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
// if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
// if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
// return NULL;
// }
// }
//
// *pageId = (++pSupporter->lastPageId);
// return getFilePage(pSupporter, *pageId);
//}
//tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo,
// SMeterQuerySupportObj *pSupporter) {
// uint32_t pageId = 0;
//
// tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
// if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
// return NULL;
// }
//
// if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) {
// pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1;
// pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc);
// }
//
// pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId;
// return pPage;
//}
void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -6869,12 +6934,16 @@ void setCtxOutputPointerForSupplementScan(SMeterQuerySupportObj *pSupporter, SMe
tFilePage *pData = NULL;
int32_t i = 0;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
// find the position for this output result
for (; i < pMeterQueryInfo->numOfPages; ++i) {
pData = getFilePage(pSupporter, pMeterQueryInfo->pageList[i]);
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
for (; i < list.size; ++i) {
pData = getResultBufferPageById(pResultBuf, list.pData[i]);
if (index <= pData->numOfElems) {
break;
}
index -= pData->numOfElems;
}
......@@ -6936,17 +7005,23 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
tFilePage * pData = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
// SQuery *pQuery = pRuntimeEnv->pQuery;
// in the first scan, new space needed for results
if (pMeterQueryInfo->numOfPages == 0) {
pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
int32_t pageId = -1;
if (list.size == 0) {
// pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId);
} else {
int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1];
pData = getFilePage(pSupporter, lastPageId);
// int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1];
pData = getResultBufferPageById(pResultBuf, getLastPageId(&list));
// pData = getFilePage(pSupporter, lastPageId);
printf("==============%d\n", pData->numOfElems);
if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
pData = addDataPageForMeterQueryInfo(pRuntimeEnv->pQuery, pMeterQueryInfo, pSupporter);
pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId);
if (pData != NULL) {
assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer
}
......@@ -7246,7 +7321,9 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
assert(completed);
if (pQuery->ekey == pSupporter->rawEKey) {
// while the interval time window is less than the time range gap between two points, nextKey may be greater than
// pSupporter->rawEKey
if (pQuery->ekey == pSupporter->rawEKey || nextKey > pSupporter->rawEKey) {
/* whole query completed, save result and abort */
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
......@@ -7495,8 +7572,12 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) {
static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pSupporter->runtimeEnv.pQuery;
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
tFilePage *newOutput = getFilePage(pSupporter, pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]);
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
int32_t id = getLastPageId(&list);
tFilePage* newOutput = getResultBufferPageById(pResultBuf, id);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
assert(pRuntimeEnv->pCtx[i].aOutputBuf - newOutput->data < DEFAULT_INTERN_BUF_SIZE);
}
......@@ -7549,12 +7630,15 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
pMeterQueryInfo->reverseIndex -= 1;
setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo);
} else {
int32_t pageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1];
tFilePage *pData = getFilePage(pSupporter, pageId);
SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, pMeterQueryInfo->sid);
int32_t pageId = getLastPageId(&list);
tFilePage* pData = getResultBufferPageById(pSupporter->pResultBuf, pageId);
// in handling records occuring around '1970-01-01', the aligned start timestamp may be 0.
TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0);
printf("-----------------------%d\n", pData->numOfElems);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery),
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1);
......@@ -7592,7 +7676,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
return TSDB_CODE_SUCCESS;
}
static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) {
static int32_t getNumOfSubset(SMeterQuerySupportObj *pSupporter) {
SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
int32_t totalSubset = 0;
......@@ -7615,7 +7699,7 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes
dTrace("QInfo:%p start to copy data to dest buf", GET_QINFO_ADDR(pSupporter->runtimeEnv.pQuery));
int32_t totalSubset = getSubsetNumber(pSupporter);
int32_t totalSubset = getNumOfSubset(pSupporter);
if (orderType == TSQL_SO_ASC) {
startIdx = pSupporter->subgroupIdx;
......@@ -7908,18 +7992,10 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
ret -= pQuery->limit.offset;
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
// if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].resBytes);
}
// } else {
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes,
// pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) *
// pQuery->pSelectExpr[i].resBytes, ret * pQuery->pSelectExpr[i].resBytes);
// }
// }
pQuery->limit.offset = 0;
return ret;
} else {
......@@ -7940,7 +8016,11 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj *pSupporter) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
pSummary->tmpBufferInDisk = pSupporter->bufSize;
if (pSupporter->pResultBuf == NULL) {
pSummary->tmpBufferInDisk = 0;
} else {
pSummary->tmpBufferInDisk = getResBufSize(pSupporter->pResultBuf);
}
dTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo,
pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0);
......
......@@ -132,7 +132,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
pRuntimeEnv->pMeterObj = pMeterObj;
if (pMeterInfo[k].pMeterQInfo == NULL) {
pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey);
pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey);
}
if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer
......@@ -858,7 +858,9 @@ static void doOrderedScan(SQInfo *pQInfo) {
static void setupMeterQueryInfoForSupplementQuery(SMeterQuerySupportObj *pSupporter) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
changeMeterQueryInfoForSuppleQuery(pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
changeMeterQueryInfoForSuppleQuery(pResultBuf, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
}
}
......
......@@ -346,6 +346,14 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) {
// pTrace("key:%s %p add to hash table", key, pNode);
}
int32_t taosNumElemsInHashTable(HashObj *pObj) {
if (pObj == NULL) {
return 0;
}
return pObj->size;
}
/**
* add data node into hash table
* @param pObj hash object
......@@ -392,7 +400,7 @@ int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void
return 0;
}
char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen) {
char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
if (pObj->multithreadSafe) {
__rd_lock(&pObj->lock);
}
......
......@@ -135,11 +135,11 @@ static bool allocFlushoutInfoEntries(SFileInfo *pFileMeta) {
}
static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
/*
* the in-mem buffer is full.
* To flush data to disk to accommodate more data
*/
if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) {
/*
* the in-mem buffer is full.
* To flush data to disk to accommodate more data
*/
if (!tExtMemBufferFlush(pMemBuffer)) {
return false;
}
......@@ -147,7 +147,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
/*
* We do not recycle the file page structure. And in flush data operations, all
* filepage that are full of data are destroyed after data being flushed to disk.
* file page that are full of data are destroyed after data being flushed to disk.
*
* The memory buffer pages may be recycle in order to avoid unnecessary memory
* allocation later.
......@@ -189,9 +189,9 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
pLast = pMemBuffer->pTail;
}
if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) {
// enough space for records
if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records
tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows);
pMemBuffer->numOfElemsInBuffer += numOfRows;
pMemBuffer->numOfTotalElems += numOfRows;
} else {
......@@ -205,8 +205,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
int32_t remain = numOfRows - numOfRemainEntries;
while (remain > 0) {
if (!tExtMemBufferAlloc(pMemBuffer)) {
// failed to allocate memory buffer
if (!tExtMemBufferAlloc(pMemBuffer)) { // failed to allocate memory buffer
return -1;
}
......@@ -252,7 +251,7 @@ static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) {
pFlushoutInfo->numOfPages = pMemBuffer->numOfInMemPages;
pFileMeta->flushoutData.nLength += 1;
} else {
// always update the first flushout array in single_flush_model
// always update the first flush out array in single_flush_model
pFileMeta->flushoutData.nLength = 1;
tFlushoutInfo *pFlushoutInfo = &pFileMeta->flushoutData.pFlushoutInfo[0];
pFlushoutInfo->numOfPages += pMemBuffer->numOfInMemPages;
......@@ -320,9 +319,7 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer) {
return;
}
/*
* release all data in memory buffer
*/
//release all data in memory buffer
tFilePagesItem *first = pMemBuffer->pHead;
while (first != NULL) {
tFilePagesItem *ptmp = first;
......@@ -335,6 +332,7 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer) {
pMemBuffer->numOfElemsInBuffer = 0;
pMemBuffer->numOfInMemPages = 0;
pMemBuffer->pHead = NULL;
pMemBuffer->pTail = NULL;
......@@ -586,7 +584,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderIdx.pData[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].type, "before", startx, midx, endx);
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif
if (compareFn(pDescriptor, numOfRows, midIdx, start, data) == 1) {
......@@ -607,7 +605,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].type, "after", startx, midx, endx);
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "after", startx, midx, endx);
#endif
}
......@@ -661,15 +659,15 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
#ifdef _DEBUG_VIEW
printf("before sort:\n");
tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
// printf("before sort:\n");
// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
#endif
int32_t s = start, e = end;
median(pDescriptor, numOfRows, start, end, data, compareFn);
#ifdef _DEBUG_VIEW
printf("%s called: %d\n", __FUNCTION__, qsort_call++);
// printf("%s called: %d\n", __FUNCTION__, qsort_call++);
#endif
UNUSED(qsort_call);
......@@ -695,7 +693,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
#ifdef _DEBUG_VIEW
tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
#endif
while (s < e) {
......@@ -714,7 +712,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
swap(pDescriptor->pColumnModel, numOfRows, s, data, e);
}
#ifdef _DEBUG_VIEW
tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
#endif
}
......@@ -731,7 +729,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
rightx += (end - end_same);
#ifdef _DEBUG_VIEW
tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
#endif
}
......@@ -748,7 +746,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
leftx -= (start_same - start);
#ifdef _DEBUG_VIEW
tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1);
#endif
}
......
#include "hash.h"
#include "taoserror.h"
#include "textbuffer.h"
#include "tlog.h"
#include "tsqlfunction.h"
#include "tresultBuf.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
SQueryResultBuf* pResBuf = calloc(1, sizeof(SQueryResultBuf));
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
pResBuf->numOfPages = size;
pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pResBuf->incStep = 4;
// init id hash table
pResBuf->idsTable = taosInitHashTable(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = calloc(size, sizeof(SIDList));
pResBuf->numOfAllocGroupIds = size;
char path[4096] = {0};
getTmpfilePath("tsdb_q_buf", path);
pResBuf->path = strdup(path);
pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666);
memset(path, 0, tListLen(path));
getTmpfilePath("tsdb_q_i", path);
pResBuf->internpath = strdup(path);
pResBuf->internfd = open(pResBuf->internpath, O_CREAT|O_RDWR, 0666);
if (!FD_VALID(pResBuf->fd)) {
pError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
return TSDB_CODE_CLI_NO_DISKSPACE;
}
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != TSDB_CODE_SUCCESS) {
pError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
return TSDB_CODE_CLI_NO_DISKSPACE;
}
pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0);
if (pResBuf->pBuf == MAP_FAILED) {
pError("QInfo:%p failed to map temp file: %s. %s", pResBuf->path, strerror(errno));
return TSDB_CODE_CLI_OUT_OF_MEMORY; // todo change error code
}
pTrace("create tmp file for output result, %s, " PRId64 "bytes", pResBuf->path, pResBuf->totalBufSize);
*pResultBuf = pResBuf;
return TSDB_CODE_SUCCESS;
}
tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); }
int32_t getResBufSize(SQueryResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
static int32_t extendDiskFileSize(SQueryResultBuf* pResultBuf, int32_t numOfPages) {
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize);
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
pResultBuf->numOfPages += numOfPages;
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != 0) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return -TSDB_CODE_SERV_NO_DISKSPACE;
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return -TSDB_CODE_SERV_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static bool noMoreAvailablePages(SQueryResultBuf* pResultBuf) {
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static int32_t getGroupIndex(SQueryResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
char* p = taosGetDataFromHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return -1;
}
int32_t slot = GET_INT32_VAL(p);
assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds);
return slot;
}
static int32_t addNewGroupId(SQueryResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
if (pResultBuf->numOfAllocGroupIds <= num) {
size_t n = pResultBuf->numOfAllocGroupIds << 1u;
SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n);
assert(p != NULL);
memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds);
pResultBuf->list = p;
pResultBuf->numOfAllocGroupIds = n;
}
taosAddToHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
return num;
}
static int32_t doRegisterId(SIDList* pList, int32_t id) {
if (pList->size >= pList->alloc) {
int32_t s = 0;
if (pList->alloc == 0) {
s = 4;
assert(pList->pData == NULL);
} else {
s = pList->alloc << 1u;
}
int32_t* c = realloc(pList->pData, s * sizeof(int32_t));
assert(c);
memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc);
pList->pData = c;
pList->alloc = s;
}
pList->pData[pList->size++] = id;
return 0;
}
static void registerPageId(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
slot = addNewGroupId(pResultBuf, groupId);
}
SIDList* pList = &pResultBuf->list[slot];
doRegisterId(pList, pageId);
}
tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
if (noMoreAvailablePages(pResultBuf)) {
if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
// register new id in this group
*pageId = (pResultBuf->allocateId++);
registerPageId(pResultBuf, groupId, *pageId);
return getResultBufferPageById(pResultBuf, *pageId);
}
int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId) {
SIDList list = {0};
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
return list;
} else {
return pResultBuf->list[slot];
}
}
void destroyResultBuf(SQueryResultBuf* pResultBuf) {
if (pResultBuf == NULL) {
return;
}
if (FD_VALID(pResultBuf->fd)) {
close(pResultBuf->fd);
}
pTrace("disk-based output buffer closed, %" PRId64 " bytes, file:%s", pResultBuf->totalBufSize, pResultBuf->path);
munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
unlink(pResultBuf->path);
tfree(pResultBuf->path);
for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) {
SIDList* pList = &pResultBuf->list[i];
tfree(pList->pData);
}
tfree(pResultBuf->list);
taosCleanUpHashTable(pResultBuf->idsTable);
tfree(pResultBuf);
}
int32_t getLastPageId(SIDList *pList) {
if (pList == NULL && pList->size <= 0) {
return -1;
}
return pList->pData[pList->size - 1];
}
......@@ -274,7 +274,7 @@ int tSQLKeywordCode(const char* z, int n) {
}
}
SKeyword** pKey = (SKeyword**)taosGetDataFromHash(KeywordHashTable, key, n);
SKeyword** pKey = (SKeyword**)taosGetDataFromHashTable(KeywordHashTable, key, n);
if (pKey != NULL) {
return (*pKey)->type;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册