提交 b4fce76b 编写于 作者: H Haojun Liao

[td-225] fix bugs in query. and refactor some codes.

上级 d6bc3261
......@@ -31,8 +31,8 @@ extern "C" {
#include "tutil.h"
#include "qExecutor.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qsqlparser.h"
#include "tcmdtype.h"
// forward declaration
......
......@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
}
tscDebug("client is initialized successfully");
......
......@@ -20,8 +20,8 @@
#include "hash.h"
#include "qFill.h"
#include "qResultbuf.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qsqlparser.h"
#include "query.h"
#include "taosdef.h"
#include "tarray.h"
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEQUERYUTIL_H
#define TDENGINE_VNODEQUERYUTIL_H
#ifndef TDENGINE_QRESULTBUF_H
#define TDENGINE_QRESULTBUF_H
#ifdef __cplusplus
extern "C" {
......@@ -26,11 +26,18 @@ extern "C" {
typedef struct SArray* SIDList;
typedef struct SPageInfo {
int32_t pageId;
int32_t offset;
int32_t lengthOnDisk;
} SPageInfo;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd; // data file fd
FILE* file;
// int32_t fd; // data file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
void* pBuf; // mmap buffer pointer
......@@ -43,6 +50,8 @@ typedef struct SDiskbasedResultBuf {
void* iBuf; // inmemory buf
void* handle; // for debug purpose
void* emptyDummyIdList; // dummy id list
bool comp;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
......@@ -56,7 +65,7 @@ typedef struct SDiskbasedResultBuf {
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize, int32_t pagesize,
int32_t inMemPages, void* handle);
int32_t inMemPages, const void* handle);
/**
*
......@@ -126,4 +135,4 @@ int32_t getLastPageId(SIDList pList);
}
#endif
#endif // TDENGINE_VNODEQUERYUTIL_H
#endif // TDENGINE_QRESULTBUF_H
......@@ -18,8 +18,8 @@
#include "exception.h"
#include "qAst.h"
#include "qSqlparser.h"
#include "qSyntaxtreefunction.h"
#include "qsqlparser.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
......
......@@ -6617,14 +6617,16 @@ void* qOpenQueryMgmt(int32_t vgId) {
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt));
pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryHandle->closed = false;
pthread_mutex_init(&pQueryHandle->lock, NULL);
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->closed = false;
pQueryMgmt->vgId = vgId;
pthread_mutex_init(&pQueryMgmt->lock, NULL);
qDebug("vgId:%d, open querymgmt success", vgId);
return pQueryHandle;
return pQueryMgmt;
}
static void queryMgmtKillQueryFn(void* handle) {
......@@ -6664,7 +6666,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d querymgmt cleanup completed", vgId);
qDebug("vgId:%d queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
......
......@@ -14,7 +14,7 @@
*/
#include "os.h"
#include "qsqlparser.h"
#include "qSqlparser.h"
#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
......
......@@ -5,7 +5,7 @@
#include "taoserror.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t numOfPages, int32_t rowSize,
int32_t pagesize, int32_t inMemPages, void* handle) {
int32_t pagesize, int32_t inMemPages, const void* handle) {
*pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf));
SDiskbasedResultBuf* pResBuf = *pResultBuf;
......@@ -24,6 +24,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->incStep = 4;
pResBuf->allocateId = -1;
// todo opt perf by on demand create in memory buffer
pResBuf->iBuf = calloc(pResBuf->inMemPages, pResBuf->pageSize);
// init id hash table
......@@ -31,10 +32,10 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
pResBuf->list = taosArrayInit(numOfPages, POINTER_BYTES);
char path[PATH_MAX] = {0};
getTmpfilePath("tsdb_qbuf", path);
getTmpfilePath("qbuf", path);
pResBuf->path = strdup(path);
pResBuf->fd = FD_INITIALIZER;
pResBuf->file = NULL;
pResBuf->pBuf = NULL;
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
......@@ -52,8 +53,9 @@ int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->tota
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
if (!FD_VALID(pResultBuf->fd)) {
// pResultBuf->fd = open(pResultBuf->path, O_CREAT | O_RDWR, 0666);
pResultBuf->file = fopen(pResultBuf->path, "r+");
if (pResultBuf->file == NULL) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
......@@ -61,13 +63,15 @@ static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
assert(pResultBuf->numOfPages == pResultBuf->inMemPages);
pResultBuf->numOfPages += pResultBuf->incStep;
int32_t ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
int32_t ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED,
fileno(pResultBuf->file), 0);
if (pResultBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", pResultBuf->handle, pResultBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
......@@ -82,7 +86,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
int32_t ret = TSDB_CODE_SUCCESS;
if (pResultBuf->pBuf == NULL) {
assert(pResultBuf->fd == FD_INITIALIZER);
assert(pResultBuf->file == NULL);
if ((ret = createDiskResidesBuf(pResultBuf)) != TSDB_CODE_SUCCESS) {
return ret;
......@@ -95,7 +99,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret = ftruncate(pResultBuf->fd, NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
ret = ftruncate(fileno(pResultBuf->file), NUM_OF_PAGES_ON_DISK(pResultBuf) * pResultBuf->pageSize);
if (ret != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
......@@ -103,7 +107,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t incNu
}
pResultBuf->totalBufSize = pResultBuf->numOfPages * pResultBuf->pageSize;
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
pResultBuf->pBuf = mmap(NULL, FILE_SIZE_ON_DISK(pResultBuf), PROT_READ | PROT_WRITE, MAP_SHARED, fileno(pResultBuf->file), 0);
if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
......@@ -185,11 +189,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
return;
}
if (FD_VALID(pResultBuf->fd)) {
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file created:%s, file size:%d", handle,
pResultBuf->totalBufSize, pResultBuf->path, FILE_SIZE_ON_DISK(pResultBuf));
close(pResultBuf->fd);
fclose(pResultBuf->file);
munmap(pResultBuf->pBuf, FILE_SIZE_ON_DISK(pResultBuf));
pResultBuf->pBuf = NULL;
} else {
......
......@@ -30,7 +30,7 @@
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "qsqlparser.h"
#include "qSqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h"
#include "ttokendef.h"
......
......@@ -654,8 +654,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime);
tsdbDebug("%p load file block into buffer, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us",
pQueryHandle, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime);
return blockLoaded;
}
......@@ -971,6 +972,52 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
}
}
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
return;
}
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
}
static void getQualifiedRowsPos(STsdbQueryHandle* pQueryHandle, int32_t startPos, int32_t endPos,
int32_t numOfExisted, int32_t *start, int32_t *end) {
*start = -1;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - startPos + 1;
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
*end = (pQueryHandle->outputCapacity - numOfExisted) + startPos - 1;
}
*start = startPos;
} else {
int32_t remain = (startPos - endPos) + 1;
if (remain + numOfExisted > pQueryHandle->outputCapacity) {
*end = startPos + 1 - (pQueryHandle->outputCapacity - numOfExisted);
}
*start = *end;
*end = startPos;
}
}
static void updateInfoAfterMerge(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
SQueryFilePos* cur = &pQueryHandle->cur;
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
cur->pos = endPos;
}
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
......@@ -978,7 +1025,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
TSKEY* tsArray = pCols->cols[0].pData;
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
......@@ -1004,9 +1054,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = cur->pos;
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
TSKEY* tsArray = pCols->cols[0].pData;
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
......@@ -1014,34 +1061,22 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos;
int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
end = cur->pos;
start = endPos;
SWAP(start, end, int32_t);
}
cur->win.skey = tsArray[start];
cur->win.ekey = tsArray[end];
// todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
return;
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
......@@ -1087,27 +1122,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
moveToNextRowInMem(pCheckInfo);
}
int32_t start = -1;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = end - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
start = pos;
} else {
int32_t remain = (pos - end) + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
start = end;
end = pos;
}
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = tsArray[end];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
......@@ -1124,30 +1147,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->win.skey = tsArray[pos];
}
int32_t start = -1;
int32_t end = -1;
// all remain data are qualified, but check the remain capacity in the first place.
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
start = pos;
end = endPos;
} else {
int32_t remain = pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
start = endPos;
end = pos;
}
int32_t start = -1, end = -1;
getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = tsArray[end];
cur->lastKey = cur->win.ekey + step;
}
}
}
......@@ -1157,21 +1164,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
}
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
cur->pos = pos;
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(cur->win.skey >= pQueryHandle->window.skey && cur->win.ekey <= pQueryHandle->window.ekey);
} else {
assert(cur->win.skey >= pQueryHandle->window.ekey && cur->win.ekey <= pQueryHandle->window.skey);
}
tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
......
......@@ -674,6 +674,7 @@ void* taosCacheTimedRefresh(void *handle) {
// check if current cache object will be deleted every 500ms.
if (pCacheObj->deleting) {
uDebug("%s refresh threads quit", pCacheObj->name);
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册