提交 57b49262 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 8f14f8e8
......@@ -399,8 +399,8 @@ typedef struct SOptrBasicInfo {
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
// SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
......
......@@ -324,11 +324,16 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
}
if (newCapacity == pResultRowInfo->capacity) {
if (newCapacity <= pResultRowInfo->capacity) {
newCapacity += 4;
}
pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
if (p == NULL) {
longjmp(env, TSDB_CODE_OUT_OF_MEMORY);
}
pResultRowInfo->pPosition = (SResultRowPosition*)p;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
......@@ -419,88 +424,56 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
* | 8 bytes | actual length |
* +----------+---------------+
*/
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
bool existInCurrentResusltRowInfo = false;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = NULL;
// in case of repeat scan/reverse scan, no new time window added.
if (isIntervalQuery) {
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
if (p1 != NULL) {
return getResultRowByPos(pResultBuf, p1);
} else {
return NULL;
}
}
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
existInCurrentResusltRowInfo =
false; // this time window created by other timestamp that does not belongs to current table.
} else if (pResultRowInfo->size == 1) {
SResultRowPosition* p = &pResultRowInfo->pPosition[0];
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
} else { // check if current pResultRowInfo contains the existInCurrentResusltRowInfo pResultRow
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) {
// TODO check the scan order for current opened time window
existInCurrentResusltRowInfo = true;
} else {
existInCurrentResusltRowInfo = false;
}
}
if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1);
}
} else {
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
// pResultRowInfo object.
if (p1 != NULL) {
return getResultRowByPos(pResultBuf, p1);
pResult = getResultRowByPos(pResultBuf, p1);
}
}
SResultRow* pResult = NULL;
if (!existInCurrentResusltRowInfo) {
// 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1) { // todo extract function
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
closeResultRow(pRow);
releaseBufPage(pResultBuf, pPage);
}
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (p1 == NULL) {
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult);
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition));
SResultRowCell cell = {.groupId = groupId, .pos = pos};
taosArrayPush(pSup->pResultRowArrayList, &cell);
} else {
pResult = getResultRowByPos(pResultBuf, p1);
}
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
pResult->offset != pResultRowInfo->cur.offset))) {
// todo extract function
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
closeResultRow(pRow);
releaseBufPage(pResultBuf, pPage);
}
// allocate a new buffer page
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (pResult == NULL) {
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult);
// 2. set the new time window to be the new active time window
pResultRowInfo->pPosition[pResultRowInfo->size++] =
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur,
POINTER_BYTES);
} else {
pResult = getResultRowByPos(pResultBuf, p1);
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
}
// 2. set the new time window to be the new active time window
pResultRowInfo->pPosition[pResultRowInfo->size++] =
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// too many time window in query
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
......@@ -656,7 +629,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey);
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey,
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey,
TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup);
if (pResultRow == NULL) {
......@@ -1707,7 +1680,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx;
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
......@@ -2713,7 +2686,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
int64_t tid = 0;
int64_t groupId = 0;
SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true,
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true,
groupId, pTaskInfo, false, pSup);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
......@@ -3001,7 +2974,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SResultRow* pResultRow =
doSetResultOutBufByKey_rv(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
assert(pResultRow != NULL);
......@@ -4895,8 +4868,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
initResultRow(resultRow);
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
(SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
}
......@@ -5609,10 +5582,10 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -5628,8 +5601,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
static void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable);
taosHashCleanup(pAggSup->pResultRowListSet);
taosArrayDestroy(pAggSup->pResultRowArrayList);
// taosHashCleanup(pAggSup->pResultRowListSet);
// taosArrayDestroy(pAggSup->pResultRowArrayList);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
......@@ -6352,7 +6325,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL) {
if (pDataReader == NULL && terrno != 0) {
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册