提交 98ba80a9 编写于 作者: H Haojun Liao

[TD-225] refactor codes.

上级 d0e85228
...@@ -80,5 +80,7 @@ void* destroyResultRowPool(SResultRowPool* p); ...@@ -80,5 +80,7 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -186,7 +186,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, ...@@ -186,7 +186,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData,
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery); static bool hasMainOutput(SQuery *pQuery);
static void buildTagQueryResult(SQInfo *pQInfo); static void buildTagQueryResult(SQInfo *pQInfo);
...@@ -283,8 +283,8 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { ...@@ -283,8 +283,8 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
} }
} }
static UNUSED_FUNC int32_t getGroupResultId(int32_t groupIndex) { static int32_t getMergeResultGroupId(int32_t groupIndex) {
int32_t base = 20000000; int32_t base = 50000000;
return base + (groupIndex * 10000); return base + (groupIndex * 10000);
} }
...@@ -1115,7 +1115,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1115,7 +1115,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer // not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData; char* d = pData;
int16_t len = bytes; int16_t len = bytes;
if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) {
...@@ -1128,7 +1128,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -1128,7 +1128,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
uint64_t uid = groupIndex; // uid is always set to be 0. uint64_t uid = groupIndex;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid);
if (pResultRow == NULL) { if (pResultRow == NULL) {
return -1; return -1;
...@@ -1714,7 +1714,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1714,7 +1714,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
// if it is group by normal column, do not set output buffer, the output buffer is pResult // if it is group by normal column, do not set output buffer, the output buffer is pResult
// fixed output query/multi-output query for normal table // fixed output query/multi-output query for normal table
if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
} }
if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) { if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) {
...@@ -2936,10 +2936,24 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2936,10 +2936,24 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
assert(size == pGroupResInfo->numOfDataPages); assert(size == pGroupResInfo->numOfDataPages);
bool done = false; bool done = false;
//TODO add API for release none-dirty pages
// SPageInfo* prev = NULL;
for (int32_t j = pGroupResInfo->pageId; j < size; ++j) { for (int32_t j = pGroupResInfo->pageId; j < size; ++j) {
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j); SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId); tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
// release previous buffer pages
// if (prev == NULL) {
// prev = pi;
// } else {
// if (prev->pageId != pi->pageId) {
// releaseResBufPageInfo(pResultBuf, prev);
// prev = pi;
// }
// }
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num); assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num);
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId); int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId);
...@@ -3058,7 +3072,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -3058,7 +3072,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool); SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool);
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
pQInfo->groupResInfo.groupId = getGroupResultId(pQInfo->groupIndex); pQInfo->groupResInfo.groupId = getMergeResultGroupId(pQInfo->groupIndex);
// todo add windowRes iterator // todo add windowRes iterator
int64_t lastTimestamp = -1; int64_t lastTimestamp = -1;
...@@ -3339,12 +3353,12 @@ int32_t initResultRow(SResultRow *pResultRow) { ...@@ -3339,12 +3353,12 @@ int32_t initResultRow(SResultRow *pResultRow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t groupIndex = 0; int32_t tid = 0;
int32_t uid = 0; int64_t uid = getResultInfoUId(pRuntimeEnv);
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
...@@ -3427,7 +3441,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3427,7 +3441,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->limit.offset -= pQuery->rec.rows; pQuery->limit.offset -= pQuery->rec.rows;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
// clear the buffer full flag if exists // clear the buffer full flag if exists
CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL); CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL);
...@@ -3792,7 +3806,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { ...@@ -3792,7 +3806,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return; return;
} }
uint64_t uid = 0; // uid is always set to be 0 uint64_t uid = getResultInfoUId(pRuntimeEnv);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid); sizeof(groupIndex), true, uid);
if (pResultRow == NULL) { if (pResultRow == NULL) {
...@@ -4629,10 +4643,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4629,10 +4643,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
int32_t ps = DEFAULT_PAGE_SIZE; int32_t ps = DEFAULT_PAGE_SIZE;
int32_t rowsize = 0; int32_t rowsize = 0;
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
int32_t TWOMB = 1024*1024*2; int32_t TENMB = 1024*1024*10;
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4660,7 +4674,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4660,7 +4674,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
int32_t numOfResultRows = getInitialPageNum(pQInfo); int32_t numOfResultRows = getInitialPageNum(pQInfo);
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4930,7 +4944,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4930,7 +4944,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
if (isPointInterpoQuery(pQuery)) { if (isPointInterpoQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
...@@ -5096,7 +5110,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5096,7 +5110,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
return; return;
} }
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
SArray *group = GET_TABLEGROUP(pQInfo, 0); SArray *group = GET_TABLEGROUP(pQInfo, 0);
...@@ -5456,7 +5470,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5456,7 +5470,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
// for ts_comp query, re-initialized is not allowed // for ts_comp query, re-initialized is not allowed
if (!isTSCompQuery(pQuery)) { if (!isTSCompQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
} }
// skip blocks without load the actual data block from file if no filter condition present // skip blocks without load the actual data block from file if no filter condition present
...@@ -5486,7 +5500,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -5486,7 +5500,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
qDebug("QInfo:%p skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, qDebug("QInfo:%p skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64,
pQInfo, pQuery->limit.offset, pQuery->current->lastKey, pQuery->current->win.ekey); pQInfo, pQuery->limit.offset, pQuery->current->lastKey, pQuery->current->win.ekey);
resetCtxOutputBuf(pRuntimeEnv); resetDefaultResInfoOutputBuf(pRuntimeEnv);
} }
limitResults(pRuntimeEnv); limitResults(pRuntimeEnv);
......
...@@ -407,7 +407,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { ...@@ -407,7 +407,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
} }
if (pResultBuf->file != NULL) { if (pResultBuf->file != NULL) {
qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f", qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb",
pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0,
pResultBuf->fileSize/1024.0); pResultBuf->fileSize/1024.0);
......
...@@ -20,6 +20,18 @@ ...@@ -20,6 +20,18 @@
#include "qExecutor.h" #include "qExecutor.h"
#include "qUtil.h" #include "qUtil.h"
static int32_t getResultRowKeyInfo(SResultRow* pResult, int16_t type, char** key, int16_t* bytes) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
*key = varDataVal(pResult->key);
*bytes = varDataLen(pResult->key);
} else {
*key = (char*) &pResult->win.skey;
*bytes = tDataTypeDesc[type].nSize;
}
return 0;
}
int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0; int32_t size = 0;
...@@ -94,12 +106,8 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -94,12 +106,8 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
assert(num >= 0 && num <= numOfClosed); assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type; int16_t type = pWindowResInfo->type;
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); // uid is always set to be 0. int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = id->uid;
if (pRuntimeEnv->groupbyNormalCol) {
uid = 0;
}
char *key = NULL; char *key = NULL;
int16_t bytes = -1; int16_t bytes = -1;
...@@ -107,16 +115,7 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -107,16 +115,7 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table if (pResult->closed) { // remove the window slot from hash table
getResultRowKeyInfo(pResult, type, &key, &bytes);
// todo refactor
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key);
bytes = varDataLen(pResult->key);
} else {
key = (char*) &pResult->win.skey;
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
} else { } else {
...@@ -141,16 +140,9 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -141,16 +140,9 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SResultRow *pResult = pWindowResInfo->pResult[k]; SResultRow *pResult = pWindowResInfo->pResult[k];
getResultRowKeyInfo(pResult, type, &key, &bytes);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key);
bytes = varDataLen(pResult->key);
} else {
key = (char*) &pResult->win.skey;
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
assert(p != NULL); assert(p != NULL);
...@@ -379,3 +371,12 @@ void* destroyResultRowPool(SResultRowPool* p) { ...@@ -379,3 +371,12 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p); tfree(p);
return NULL; return NULL;
} }
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current);
return id->uid;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册