提交 63f3da03 编写于 作者: H Haojun Liao

[td-11818] refactor and fix bug.

上级 07a8f98b
...@@ -90,6 +90,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); ...@@ -90,6 +90,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
......
...@@ -37,6 +37,15 @@ typedef struct SFilePage { ...@@ -37,6 +37,15 @@ typedef struct SFilePage {
char data[]; char data[];
} SFilePage; } SFilePage;
typedef struct SDiskbasedBufStatis {
int64_t flushBytes;
int64_t loadBytes;
int32_t loadPages;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
/** /**
* create disk-based result buffer * create disk-based result buffer
* @param pBuf * @param pBuf
...@@ -150,6 +159,11 @@ void setBufPageDirty(SFilePage* pPageInfo, bool dirty); ...@@ -150,6 +159,11 @@ void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
*/ */
void printStatisBeforeClose(SDiskbasedBuf* pBuf); void printStatisBeforeClose(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
*/
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -512,6 +512,35 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) { ...@@ -512,6 +512,35 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) {
return rowSize; return rowSize;
} }
/**
* @refitem blockDataToBuf for the meta size
*
* @param pBlock
* @return
*/
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL);
double rowSize = 0;
size_t numOfCols = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
rowSize += pColInfo->info.bytes;
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
rowSize += sizeof(int32_t);
} else {
rowSize += 1/8.0;
}
}
return rowSize;
}
typedef struct SSDataBlockSortHelper { typedef struct SSDataBlockSortHelper {
SArray *orderInfo; // SArray<SBlockOrderInfo> SArray *orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
......
...@@ -389,7 +389,7 @@ typedef struct SExchangeInfo { ...@@ -389,7 +389,7 @@ typedef struct SExchangeInfo {
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
SSDataBlock* pResult; SSDataBlock* pResult;
bool seqLoadData; bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
...@@ -591,6 +591,12 @@ typedef struct SOrderOperatorInfo { ...@@ -591,6 +591,12 @@ typedef struct SOrderOperatorInfo {
int32_t numOfRowsInRes; int32_t numOfRowsInRes;
SMsortComparParam cmpParam; SMsortComparParam cmpParam;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo; } SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
......
...@@ -358,7 +358,6 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) ...@@ -358,7 +358,6 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo)
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
......
...@@ -5604,6 +5604,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5604,6 +5604,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
tfree(pInfo->currentGroupColData); tfree(pInfo->currentGroupColData);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
...@@ -5854,16 +5855,20 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou ...@@ -5854,16 +5855,20 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
char* pData = colDataGet(pSrcColInfo, *rowIndex);
colDataAppend(pColInfo, pBlock->info.rows, pData, isNull); if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
} else {
char* pData = colDataGet(pSrcColInfo, *rowIndex);
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
}
} }
pBlock->info.rows += 1; pBlock->info.rows += 1;
*rowIndex += 1; *rowIndex += 1;
} }
static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, int32_t numOfCols) {
SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource));
if (pSource == NULL) { if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
...@@ -5883,24 +5888,17 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { ...@@ -5883,24 +5888,17 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) {
taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); taosArrayPush(pSource->pBlock->pDataBlock, &colInfo);
} }
taosArrayPush(pInfo->pSources, &pSource); taosArrayPush(pAllSources, &pSource);
pInfo->sourceId += 1; pInfo->sourceId += 1;
pInfo->cmpParam.numOfSources += 1;
if (pInfo->cmpParam.numOfSources > getNumOfInMemBufPages(pInfo->pSortInternalBuf)) {
// TODO sort memory not enough, return with error code.
}
ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources)); int32_t rowSize = blockDataGetSerialRowSize(pSource->pBlock);
int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize;
int32_t rowSize = blockDataGetRowSize(pSource->pBlock);
int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize;
return blockDataEnsureCapacity(pSource->pBlock, numOfRows); return blockDataEnsureCapacity(pSource->pBlock, numOfRows);
} }
void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) {
int32_t start = 0; int32_t start = 0;
while(start < pInfo->pDataBlock->info.rows) { while(start < pInfo->pDataBlock->info.rows) {
...@@ -5933,36 +5931,47 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { ...@@ -5933,36 +5931,47 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
int32_t code = doAddNewSource(pInfo, numOfCols); int32_t code = doAddNewSource(pInfo, pSources, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(env, code); longjmp(env, code);
} }
} }
static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorInfo* pInfo) { static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SDiskbasedBuf* pBuf) {
cmpParam->pSources = pInfo->pSources->pData; cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
for(int32_t i = 0; i < pInfo->cmpParam.numOfSources; ++i) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i]; SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SPageInfo* pPgInfo = *(SPageInfo**) taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); SFilePage* pPage = getBufPage(pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseBufPage(pInfo->pSortInternalBuf, pPage); releaseBufPage(pBuf, pPage);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam) { static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->pBlock);
tfree(pSource);
}
cmpParam->numOfSources = 0;
}
static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
while(1) { while(1) {
if (pInfo->cmpParam.numOfSources == pInfo->numOfCompleted) { if (cmpParam->numOfSources == pInfo->numOfCompleted) {
break; break;
} }
...@@ -5976,7 +5985,7 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI ...@@ -5976,7 +5985,7 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
if (pInfo->pDataBlock->info.rows >= pInfo->numOfRowsInRes) { if (pInfo->pDataBlock->info.rows >= capacity) {
return pInfo->pDataBlock; return pInfo->pDataBlock;
} }
} }
...@@ -5984,6 +5993,108 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI ...@@ -5984,6 +5993,108 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
} }
static int32_t doInternalSort(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo) {
size_t numOfSources = taosArrayGetSize(pInfo->pSources);
// Calculate the I/O counts to complete the data sort.
double sortCount = floorl(log2(numOfSources) / log2(getNumOfInMemBufPages(pInfo->pSortInternalBuf)));
pInfo->totalElapsed = taosGetTimestampUs() - pInfo->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
GET_TASKID(pTaskInfo), (int32_t) (sortCount + 1), getTotalBufSize(pInfo->pSortInternalBuf), pInfo->sortElapsed,
pInfo->totalElapsed);
size_t pgSize = getBufPageSize(pInfo->pSortInternalBuf);
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pInfo->pDataBlock))/ blockDataGetSerialRowSize(pInfo->pDataBlock);
blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows);
size_t numOfSorted = taosArrayGetSize(pInfo->pSources);
for(int32_t t = 0; t < sortCount; ++t) {
int64_t st = taosGetTimestampUs();
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
SMsortComparParam resultParam = {.orderInfo = pInfo->cmpParam.orderInfo};
int32_t numOfInputSources = getNumOfInMemBufPages(pInfo->pSortInternalBuf);
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for(int32_t i = 0; i < sortGroup; ++i) {
pInfo->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pInfo->cmpParam.numOfSources = end - i * numOfInputSources + 1;
int32_t code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, i * numOfInputSources, end, pInfo->pSortInternalBuf);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
while (1) {
SSDataBlock* pDataBlock = getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, numOfRows);
if (pDataBlock == NULL) {
break;
}
int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
if (pPage == NULL) {
assert(0);
longjmp(pTaskInfo->env, terrno);
}
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
blockDataToBuf(pPage->data, pDataBlock);
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pSortInternalBuf, pPage);
blockDataClearup(pDataBlock, pInfo->hasVarCol);
}
tMergeTreeDestroy(pInfo->pMergeTree);
pInfo->numOfCompleted = 0;
code = doAddNewSource(pInfo, pResList, pInfo->pDataBlock->info.numOfCols);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
}
sortComparClearup(&pInfo->cmpParam);
taosArrayClear(pInfo->pSources);
taosArrayAddAll(pInfo->pSources, pResList);
taosArrayDestroy(pResList);
pInfo->cmpParam = resultParam;
numOfSorted = taosArrayGetSize(pInfo->pSources);
int64_t el = taosGetTimestampUs() - st;
pInfo->totalElapsed += el;
SDiskbasedBufStatis statis = getDBufStatis(pInfo->pSortInternalBuf);
qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", GET_TASKID(pTaskInfo), t + 1, el, statis.loadBytes/1024.0,
statis.flushBytes/1024.0);
}
pInfo->cmpParam.numOfSources = taosArrayGetSize(pInfo->pSources);
return 0;
}
static SSDataBlock* doSort(void* param, bool* newgroup) { static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -5995,9 +6106,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5995,9 +6106,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes);
} }
int64_t st = taosGetTimestampUs();
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
...@@ -6018,9 +6131,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -6018,9 +6131,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst);
printf("sort time:%ld\n", taosGetTimestampUs() - p);
addToDiskbasedBuf(pInfo, pTaskInfo->env); int64_t el = taosGetTimestampUs() - p;
pInfo->sortElapsed += el;
addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env);
} }
} }
...@@ -6035,14 +6150,19 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -6035,14 +6150,19 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock;
} }
addToDiskbasedBuf(pInfo, pTaskInfo->env); addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env);
} }
int32_t rowSize = blockDataGetRowSize(pInfo->pDataBlock); doInternalSort(pTaskInfo, pInfo);
int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize;
blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows); int32_t code = blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->numOfRowsInRes);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
int32_t code = sortComparInit(&pInfo->cmpParam, pInfo); int32_t numOfSources = taosArrayGetSize(pInfo->pSources);
ASSERT(numOfSources <= getNumOfInMemBufPages(pInfo->pSortInternalBuf));
code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, 0, numOfSources - 1, pInfo->pSortInternalBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -6053,7 +6173,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -6053,7 +6173,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes);
} }
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
...@@ -6089,9 +6209,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI ...@@ -6089,9 +6209,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
return NULL; return NULL;
} }
pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 64 * 1024; pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 4096; pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES); pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal); pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal);
...@@ -6104,7 +6225,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI ...@@ -6104,7 +6225,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
} }
} }
int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->bufPageSize*1000, 1, "/tmp/"); int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/");
if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) { if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator); tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo)); destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo));
...@@ -6190,8 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { ...@@ -6190,8 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
} }
// table scan order // table scan order
int32_t order = TSDB_ORDER_ASC;//pQueryAttr->order.order; int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
...@@ -6229,10 +6349,10 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { ...@@ -6229,10 +6349,10 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
closeAllResultRows(&pInfo->resultRowInfo); closeAllResultRows(&pInfo->resultRowInfo);
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
// initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
if (pInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
......
...@@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { ...@@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
for(int32_t i = 0; i < numOfRows; ++i) { for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
int32_t v = rand();//(++pInfo->startVal); int32_t v = (--pInfo->startVal);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false); colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i); // sprintf(buf, "this is %d row", i);
...@@ -110,7 +110,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { ...@@ -110,7 +110,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) {
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
pInfo->max = numOfBlocks; pInfo->max = numOfBlocks;
pInfo->startVal = 5000000; pInfo->startVal = 1500000;
pOperator->info = pInfo; pOperator->info = pInfo;
return pOperator; return pOperator;
...@@ -298,7 +298,7 @@ TEST(testCase, external_sort_Test) { ...@@ -298,7 +298,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1); // taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal); SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
...@@ -321,12 +321,13 @@ TEST(testCase, external_sort_Test) { ...@@ -321,12 +321,13 @@ TEST(testCase, external_sort_Test) {
break; break;
} }
// SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1)); // SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
// for (int32_t i = 0; i < pRes->info.rows; ++i) { for (int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i); // char* p = colDataGet(pCol2, i);
printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); // printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
// } }
} }
printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf);
......
...@@ -28,15 +28,6 @@ typedef struct SPageInfo { ...@@ -28,15 +28,6 @@ typedef struct SPageInfo {
bool dirty:1; // set current buffer page is dirty or not bool dirty:1; // set current buffer page is dirty or not
} SPageInfo; } SPageInfo;
typedef struct SDiskbasedBufStatis {
int64_t flushBytes;
int64_t loadBytes;
int32_t loadPages;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
typedef struct SDiskbasedBuf { typedef struct SDiskbasedBuf {
int32_t numOfPages; int32_t numOfPages;
int64_t totalBufSize; int64_t totalBufSize;
...@@ -56,8 +47,8 @@ typedef struct SDiskbasedBuf { ...@@ -56,8 +47,8 @@ typedef struct SDiskbasedBuf {
uint64_t nextPos; // next page flush position uint64_t nextPos; // next page flush position
uint64_t qId; // for debug purpose uint64_t qId; // for debug purpose
SDiskbasedBufStatis statis;
bool printStatis; // Print statistics info when closing this buffer. bool printStatis; // Print statistics info when closing this buffer.
SDiskbasedBufStatis statis;
} SDiskbasedBuf; } SDiskbasedBuf;
static void printStatisData(const SDiskbasedBuf* pBuf); static void printStatisData(const SDiskbasedBuf* pBuf);
...@@ -130,7 +121,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba ...@@ -130,7 +121,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba
return data; return data;
} }
*dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0); *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0);
if (*dst > 0) { if (*dst > 0) {
memcpy(data, pBuf->assistBuf, *dst); memcpy(data, pBuf->assistBuf, *dst);
} }
...@@ -164,7 +155,11 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -164,7 +155,11 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
assert(!pg->used && pg->pData != NULL); assert(!pg->used && pg->pData != NULL);
int32_t size = -1; int32_t size = -1;
char* t = doCompressData(GET_DATA_PAYLOAD(pg), pBuf->pageSize, &size, pBuf); char* t = NULL;
if (pg->offset == -1 || pg->dirty) {
SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf);
}
// this page is flushed to disk for the first time // this page is flushed to disk for the first time
if (pg->offset == -1) { if (pg->offset == -1) {
...@@ -225,7 +220,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -225,7 +220,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
} }
char* pDataBuf = pg->pData; char* pDataBuf = pg->pData;
memset(pDataBuf, 0, pBuf->pageSize); memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage));
pg->pData = NULL; // this means the data is not in buffer pg->pData = NULL; // this means the data is not in buffer
pg->length = size; pg->length = size;
...@@ -256,7 +251,8 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -256,7 +251,8 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return ret; return ret;
} }
ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->length, pBuf->file); SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg);
ret = (int32_t)fread(pPage->data, 1, pg->length, pBuf->file);
if (ret != pg->length) { if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
return ret; return ret;
...@@ -266,7 +262,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -266,7 +262,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
pBuf->statis.loadPages += 1; pBuf->statis.loadPages += 1;
int32_t fullSize = 0; int32_t fullSize = 0;
doDecompressData(GET_DATA_PAYLOAD(pg), pg->length, &fullSize, pBuf); doDecompressData(pPage->data, pg->length, &fullSize, pBuf);
return 0; return 0;
} }
...@@ -558,7 +554,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { ...@@ -558,7 +554,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pBuf) {
} }
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) {
return pBuf->inMemPages; return pBuf->inMemPages;
} }
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
...@@ -577,6 +573,10 @@ void printStatisBeforeClose(SDiskbasedBuf* pBuf) { ...@@ -577,6 +573,10 @@ void printStatisBeforeClose(SDiskbasedBuf* pBuf) {
pBuf->printStatis = true; pBuf->printStatis = true;
} }
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) {
return pBuf->statis;
}
void printStatisData(const SDiskbasedBuf* pBuf) { void printStatisData(const SDiskbasedBuf* pBuf) {
if (!pBuf->printStatis) { if (!pBuf->printStatis) {
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册