提交 966ca160 编写于 作者: H Haojun Liao

[td-225] refactor

上级 9b76c49d
......@@ -289,6 +289,7 @@ typedef struct SQueryRuntimeEnv {
int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SOperatorInfo* proot;
SGroupResInfo groupResInfo;
} SQueryRuntimeEnv;
typedef struct {
......@@ -315,7 +316,6 @@ typedef struct SQInfo {
* the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed.
*/
SGroupResInfo groupResInfo;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......@@ -389,6 +389,13 @@ typedef struct SOffsetOperatorInfo {
SQueryRuntimeEnv* pRuntimeEnv;
} SOffsetOperatorInfo;
typedef struct SHashIntervalOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
} SHashIntervalOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
......
......@@ -185,6 +185,10 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
// setup the output buffer
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -204,6 +208,23 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
return res;
}
static void* destroyOutputBuf(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
int32_t numOfOutput = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockStatis);
tfree(pBlock);
return NULL;
}
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
......@@ -565,7 +586,18 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t t
STimeWindow w = {0};
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, ts, ts, pQuery->window.ekey, &w);
} else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pQuery, ts, pQuery->window.ekey, ts, &w);
}
pWindowResInfo->prevSKey = w.skey;
} else {
w.skey = pWindowResInfo->prevSKey;
}
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
} else {
......@@ -861,7 +893,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
pCtx[k].ptsList = &tsCol[pos];
}
int32_t functionId = pQuery->pExpr1[k].base.functionId;
int32_t functionId = pCtx[k].functionId;
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
......@@ -1127,7 +1159,7 @@ static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo*
}
}
static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY* tsCols, int32_t step) {
static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const TSKEY* tsCols, int32_t step) {
TSKEY ts = TSKEY_INITIAL_VAL;
if (tsCols == NULL) {
......@@ -1264,16 +1296,24 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SResultRowInfo* pWindowResInfo = &pRuntimeEnv->resultRowInfo;
SResultRowInfo*pResultRowInfo = &pRuntimeEnv->resultRowInfo;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int32_t prevIndex = curTimeWindowIndex(pWindowResInfo);
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = pColDataInfo->pData;
}
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery);
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false;
SResultRow *pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
// goto _end;
}
......@@ -1282,60 +1322,60 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera
int32_t startPos = pQuery->pos;
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow *pRes = pWindowResInfo->pResult[j];
SResultRow *pRes = pResultRowInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p,
int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p,
w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock);
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0);
assert(ret == TSDB_CODE_SUCCESS);
}
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
startPos = getNextQualifiedWindow(pQuery, &nextWin, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, pSDataBlock->pDataBlock);
}
}
......@@ -2204,6 +2244,24 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, int32
return NULL;
}
static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
if (pCtx == NULL) {
return NULL;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
tVariantDestroy(&pCtx[i].param[j]);
}
tVariantDestroy(&pCtx[i].tag);
tfree(pCtx[i].tagInfo.pTagCtxList);
}
tfree(pCtx);
return NULL;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -2261,6 +2319,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
......@@ -2305,11 +2365,8 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
}
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
if (pRuntimeEnv->pQuery == NULL) {
return;
}
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
......@@ -2368,6 +2425,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
pRuntimeEnv->outputBuf = destroyOutputBuf(pRuntimeEnv->outputBuf);
destroyOperatorInfo(pRuntimeEnv->proot);
}
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
......@@ -2949,6 +3009,92 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
return TSDB_CODE_SUCCESS;
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
SQuery *pQuery = pRuntimeEnv->pQuery;
// int64_t groupId = pQuery->current->groupIndex;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* pCost = &pQInfo->summary;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
// if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
*status = BLK_DATA_ALL_NEEDED;
// }
if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
// if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
// SResultRow* pResult = NULL;
//
// bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
//
// TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
// STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) {
// // todo handle error in set result for timewindow
// }
// }
//
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base;
//
// int32_t functionId = pSqlFunc->functionId;
// int32_t colId = pSqlFunc->colInfo.colId;
// (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
// if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
// break;
// }
// }
}
}
if ((*status) == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->discardBlocks += 1;
} else if ((*status) == BLK_DATA_STATIS_NEEDED) {
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
pCost->loadBlockStatis += 1;
if (*pStatis == NULL) { // data block statistics does not exist, load data block
*pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pCost->totalCheckedRows += pBlockInfo->rows;
}
} else {
assert((*status) == BLK_DATA_ALL_NEEDED);
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
// current block has been discard due to filter applied
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
}
pCost->totalCheckedRows += pBlockInfo->rows;
pCost->loadBlocks += 1;
*pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
if (*pDataBlock == NULL) {
return terrno;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int32_t midPos = -1;
int32_t numOfRows;
......@@ -3372,13 +3518,13 @@ static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGrou
void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo;
while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
// all results in current group have been returned to client, try next group
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
assert(pGroupResInfo->index == 0);
if ((pQInfo->code = mergeIntoGroupResult(&pQInfo->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) {
if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) {
return;
}
}
......@@ -3536,13 +3682,12 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx, SSDataBlock* pDataBlock) {
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SSDataBlock* pDataBlock) {
int32_t tid = 0;
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SQLFunctionCtx *pCtx = &pSQLCtx[i];
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
/*
......@@ -3552,17 +3697,17 @@ void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i);
RESET_RESULT_INFO(pCellInfo);
pCtx->resultInfo = pCellInfo;
pCtx->pOutput = pData->pData;
pCtx[i].resultInfo = pCellInfo;
pCtx[i].pOutput = pData->pData;
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pSQLCtx[0].pOutput;
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
}
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
initCtxOutputBuf_rv(pCtx, pDataBlock->info.numOfCols);
}
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
......@@ -3593,19 +3738,30 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
}
}
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx) {
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pSQLCtx[j].functionId;
pSQLCtx[j].currentStage = 0;
pCtx[j].currentStage = 0;
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo->initialized) {
continue;
}
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pSQLCtx[j]);
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
}
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
pCtx[j].currentStage = 0;
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo->initialized) {
continue;
}
aAggs[functionId].init(&pSQLCtx[j]);
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
}
......@@ -4363,9 +4519,9 @@ static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGrou
* @param pQInfo
* @param result
*/
void copyToOutputBuf(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SGroupResInfo *pGroupResInfo = &pQInfo->groupResInfo;
void copyToOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SGroupResInfo *pGroupResInfo = &pRuntimeEnv->groupResInfo;
assert(pQuery->rec.rows == 0 && pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
if (!hasRemainData(pGroupResInfo)) {
......@@ -4373,7 +4529,74 @@ void copyToOutputBuf(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
}
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
pQuery->rec.rows = doCopyToSData(&pQInfo->runtimeEnv, pGroupResInfo, orderType);
pQuery->rec.rows = doCopyToSData(pRuntimeEnv, pGroupResInfo, orderType);
}
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = 0;//pQuery->rec.rows; // there are already exists result rows
int32_t start = 0;
int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to output buf", pRuntimeEnv->qinfo);
if (orderType == TSDB_ORDER_ASC) {
start = pGroupResInfo->index;
step = 1;
} else { // desc order copy all data
start = numOfRows - pGroupResInfo->index - 1;
step = -1;
}
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResultRow* pRow = taosArrayGetP(pGroupResInfo->pRows, i);
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
continue;
}
int32_t numOfRowsToCopy = pRow->numOfRows;
//current output space is not enough to accommodate all data of this page, prepare more space
// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) {
// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult);
// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv));
// }
pGroupResInfo->index += 1;
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId);
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
int32_t bytes = pColInfoData->info.bytes;
char *out = pColInfoData->pData + numOfResult * bytes;
char *in = getPosInResultPage(pRuntimeEnv, j, pRow, page);
memcpy(out, in, bytes * numOfRowsToCopy);
}
numOfResult += numOfRowsToCopy;
if (numOfResult == pQuery->rec.capacity) { // output buffer is full
break;
}
}
qDebug("QInfo:%p copy data to query buf completed", pRuntimeEnv->qinfo);
pBlock->info.rows = numOfResult;
return 0;
}
static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
if (!hasRemainData(pGroupResInfo)) {
return;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, orderType, pBlock);
}
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -4510,7 +4733,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
setQueryStatus(pQuery, QUERY_OVER);
}
} else {
if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pQInfo->groupResInfo)) {
if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) {
setQueryStatus(pQuery, QUERY_OVER);
}
}
......@@ -4985,7 +5208,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->tsdb = tsdb;
pQuery->vgId = vgId;
pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0;
pRuntimeEnv->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTsBuf = pTsBuf;
......@@ -5484,12 +5707,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo);
}
initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pQInfo, pWindowResInfo);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pRuntimeEnv, pWindowResInfo);
assert(pQuery->rec.rows == pWindowResInfo->size);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
cleanupGroupResInfo(&pQInfo->groupResInfo);
cleanupGroupResInfo(&pRuntimeEnv->groupResInfo);
break;
}
} else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) {
......@@ -5635,8 +5858,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* If the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
if (hasRemainData(&pQInfo->groupResInfo)) {
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
pQuery->rec.total += pQuery->rec.rows;
if (pQuery->rec.rows > 0) {
......@@ -5821,7 +6044,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
} else {
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
}
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
......@@ -5868,8 +6091,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
} else { // not a interval query
initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
}
// handle the limitation of output buffer
......@@ -5957,7 +6180,7 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
// this function never returns error?
uint32_t status;
int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis,
int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis,
&pBlock->pDataBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTableScanInfo->pRuntimeEnv->env, code);
......@@ -6103,7 +6326,7 @@ static SSDataBlock* doAggregation(void* param) {
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf);
setDefaultOutputBuf(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf);
pQuery->pos = 0;
while(1) {
......@@ -6122,6 +6345,8 @@ static SSDataBlock* doAggregation(void* param) {
finalizeQueryResult(pRuntimeEnv);
pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv);
destroySQLFunctionCtx(pCtx, pRuntimeEnv->outputBuf->info.numOfCols);
return pRuntimeEnv->outputBuf;
}
......@@ -6138,7 +6363,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
}
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pArithInfo->pCtx, pRes);
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes);
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
......@@ -6254,7 +6479,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf);
pQuery->pos = 0;
while(1) {
......@@ -6269,10 +6493,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
}
pOperator->completed = true;
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv);
destroySQLFunctionCtx(pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRuntimeEnv->outputBuf);
// pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv);
return pRuntimeEnv->outputBuf;
}
......@@ -6288,6 +6519,16 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) {
return 1;
}
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
if (pOperator == NULL) {
return;
}
destroyOperatorInfo(pOperator->upstream);
tfree(pOperator->optInfo);
tfree(pOperator);
}
static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......@@ -6369,6 +6610,25 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo,
return pOperator;
}
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashIntervalAggOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->exec = doHashIntervalAgg;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->optInfo = pInfo;
return pOperator;
}
/*
* in each query, this function will be called only once, no retry for further result.
......@@ -6386,10 +6646,6 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pResBlock->info.rows;
// TODO limit/offset refactor to be one operator
// skipResults(pRuntimeEnv);
// limitOperator(pQuery, pQInfo);
}
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
......@@ -6455,11 +6711,11 @@ static void copyAndFillResult(SQInfo* pQInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
while(1) {
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
TSKEY lastKey = 0;
if (!hasRemainData(&pQInfo->groupResInfo)) {
if (!hasRemainData(&pRuntimeEnv->groupResInfo)) {
lastKey = pQuery->window.ekey;
} else {
lastKey = ((TSKEY*)pQuery->sdata[0]->data)[pQuery->rec.rows - 1];
......@@ -6478,7 +6734,7 @@ static void copyAndFillResult(SQInfo* pQInfo) {
}
// here the pQuery->rec.rows == 0
if (!hasRemainData(&pQInfo->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
break;
}
}
......@@ -6500,11 +6756,14 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
}
scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
finalizeQueryResult(pRuntimeEnv);
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pResBlock->info.rows;
#if 0
// scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
// finalizeQueryResult(pRuntimeEnv);
// skip offset result rows
pQuery->rec.rows = 0;
// pQuery->rec.rows = 0;
// not fill or no result generated during this query
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->resultRowInfo.size == 0 || isPointInterpoQuery(pQuery)) {
......@@ -6514,22 +6773,23 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
return;
}
initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
limitOperator(pQuery, pQInfo);
} else {
initGroupResInfo(&pQInfo->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
return copyAndFillResult(pQInfo);
}
#endif
}
void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (hasNotReturnedResults(pRuntimeEnv, &pQInfo->groupResInfo)) {
if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
/*
* There are remain results that are not returned due to result interpolation
......@@ -6546,7 +6806,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
} else {
pQuery->rec.rows = 0;
assert(pRuntimeEnv->resultRowInfo.size > 0);
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
if (pQuery->rec.rows > 0) {
......@@ -7729,7 +7989,7 @@ void freeQInfo(SQInfo *pQInfo) {
tsdbDestroyTableGroup(&pQuery->tableGroupInfo);
taosHashCleanup(pQInfo->arrTableIdInfo);
taosArrayDestroy(pQInfo->groupResInfo.pRows);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册