提交 f9997a12 编写于 作者: H Haojun Liao

[td-13039] refactor and fix bug.

上级 ec1b6001
......@@ -128,7 +128,7 @@ target_include_directories(
set(CMAKE_PROJECT_INCLUDE_BEFORE "${CMAKE_SUPPORT_DIR}/EnableCMP0048.txt.in")
add_subdirectory(zlib)
target_include_directories(
zlib
zlibstatic
PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/zlib
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/zlib
)
......
......@@ -463,13 +463,12 @@ typedef struct SAggSupporter {
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
// SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
......
......@@ -115,7 +115,8 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
// struct size + data payload + length for each column
pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows + pInput->pData->info.numOfCols * sizeof(int32_t);
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
......
......@@ -858,9 +858,9 @@ static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pR
static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_t id, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
assert(win->skey <= win->ekey);
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
pTaskInfo, true, pAggSup);
if (pResultRow == NULL) {
......@@ -871,7 +871,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowOutputBufInitCtx_rv(pAggSup->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
......@@ -1039,7 +1039,7 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) {
pCtx[k].ptsList = &tsCol[pos];
pCtx[k].ptsList = tsCol;
}
// not a whole block involved in query processing, statistics data can not be used
......@@ -1490,7 +1490,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1512,7 +1512,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow w = pRes->win;
ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult,
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1530,7 +1530,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// restore current time window
ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1550,7 +1550,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// null data, failed to allocate more memory buffer
int32_t code = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo);
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -2897,6 +2897,8 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
*status = BLK_DATA_ALL_NEEDED;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pBlock->pDataBlock == NULL) {
return terrno;
......@@ -3371,7 +3373,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
int64_t tid = 0;
int64_t groupId = 0;
SResultRow* pRow = doSetResultOutBufByKey_rv(NULL, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
......@@ -3531,7 +3533,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue;
}
......@@ -4730,7 +4732,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo *pOperator, bool* newgroup) {
// }
// this function never returns error?
uint32_t status;
uint32_t status = BLK_DATA_ALL_NEEDED;
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5724,7 +5726,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
return pOrderColumns;
}
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey);
static void cleanupAggSup(SAggSupporter* pAggSup);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6102,7 +6104,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num);
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -6556,7 +6558,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
}
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
OPTR_SET_OPENED(pOperator);
......@@ -6577,7 +6579,7 @@ static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -7137,14 +7139,13 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator);
}
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t numOfOutput, const char* pKey) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
// pAggSup->pool = initResultRowPool(pAggSup->resultRowSize);
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
......@@ -7152,6 +7153,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, 4096, 4096 * 256, pKey, "/tmp/");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -7160,16 +7166,16 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
taosHashCleanup(pAggSup->pResultRowHashTable);
taosHashCleanup(pAggSup->pResultRowListSet);
taosArrayDestroy(pAggSup->pResultRowArrayList);
// destroyResultRowPool(pAggSup->pool);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock) {
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) {
pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
pBasicInfo->capacity = numOfRows;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols);
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey);
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
......@@ -7206,7 +7212,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
......@@ -7271,7 +7277,6 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
destroyDiskbasedBuf(pInfo->pResultBuf);
}
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -7347,7 +7352,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
......@@ -7499,17 +7504,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->win.ekey = INT64_MAX;
int32_t numOfRows = 4096;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) {
goto _error;
}
code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "TimeIntervalAggOperator";
......@@ -7595,7 +7595,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
goto _error;
}
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -394,7 +394,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t *pData = (int32_t*)pCol->pData;
int32_t *val = (int32_t*) buf;
for (int32_t i = 0; i < pCtx->size; ++i) {
for (int32_t i = start; i < start + numOfRows; ++i) {
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
......
......@@ -12,7 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC zlib
PUBLIC zlibstatic
)
if (${BUILD_WITH_UV_TRANS})
if (${BUILD_WITH_UV})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册