提交 d2ed6ff0 编写于 作者: H Haojun Liao

fix(query): set correct page buffer id.

上级 ba8b10e2
...@@ -297,10 +297,11 @@ enum { ...@@ -297,10 +297,11 @@ enum {
}; };
typedef struct SAggSupporter { typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter; } SAggSupporter;
typedef struct { typedef struct {
...@@ -429,6 +430,7 @@ typedef struct SStreamAggSupporter { ...@@ -429,6 +430,7 @@ typedef struct SStreamAggSupporter {
char* pKeyBuf; // window key buffer char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
SSDataBlock* pScanBlock; SSDataBlock* pScanBlock;
} SStreamAggSupporter; } SStreamAggSupporter;
...@@ -991,7 +993,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary ...@@ -991,7 +993,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size); int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
......
...@@ -179,24 +179,21 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR ...@@ -179,24 +179,21 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
} }
#endif #endif
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL; SFilePage* pData = NULL;
// in the first scan, new space needed for results // in the first scan, new space needed for results
int32_t pageId = -1; int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf); if (*currentPageId == -1) {
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, &pageId); pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} else { } else {
SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, *currentPageId);
pData = getBufPage(pResultBuf, getPageId(pi)); pageId = *currentPageId;
pageId = getPageId(pi);
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi); releaseBufPage(pResultBuf, pData);
pData = getNewBufPage(pResultBuf, &pageId); pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) { if (pData != NULL) {
...@@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int ...@@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
pResultRow->pageId = pageId; pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num; pResultRow->offset = (int32_t)pData->num;
*currentPageId = pageId;
pData->num += interBufSize; pData->num += interBufSize;
return pResultRow; return pResultRow;
} }
...@@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page // allocate a new buffer page
if (pResult == NULL) { if (pResult == NULL) {
#ifdef BUF_PAGE_DEBUG
qDebug("page_2");
#endif
ASSERT(pSup->resultRowSize > 0); ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
initResultRow(pResult); initResultRow(pResult);
...@@ -3093,7 +3087,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { ...@@ -3093,7 +3087,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
offset += sizeof(int32_t); offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t*)(result + offset); uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (!resultRow) { if (!resultRow) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
...@@ -3442,6 +3436,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3442,6 +3436,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
const char* pKey) { const char* pKey) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
...@@ -4678,6 +4673,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf ...@@ -4678,6 +4673,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size) { int32_t size) {
pSup->currentPageId = -1;
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
......
...@@ -3790,7 +3790,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes ...@@ -3790,7 +3790,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
} }
if (pWinInfo->pos.pageId == -1) { if (pWinInfo->pos.pageId == -1) {
*pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
if (*pResult == NULL) { if (*pResult == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册