提交 5746730a 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 4ad9e11b
......@@ -106,12 +106,11 @@ typedef struct SGroupResInfo {
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef struct SResultRec {
typedef struct SRspResultInfo {
int64_t total; // total generated result size in rows
int64_t rows; // current result set size in rows
int64_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows.
} SResultRec;
} SRspResultInfo;
typedef struct SResultRowInfo {
SResultRow** pResult; // result list
......@@ -185,7 +184,6 @@ typedef struct SSDataBlock {
SDataBlockInfo info;
} SSDataBlock;
typedef struct SQuery {
SLimitVal limit;
......@@ -227,9 +225,6 @@ typedef struct SQuery {
SSingleColumnFilterInfo* pFilterInfo;
uint32_t status; // query status
SResultRec rec;
int32_t pos;
tFilePage** sdata;
STableQueryInfo* current;
int32_t numOfCheckedBlocks; // number of check data blocks
......@@ -266,15 +261,15 @@ typedef struct SQueryRuntimeEnv {
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
struct SOperatorInfo* pi;
SSDataBlock *outputBuf;
int32_t groupIndex;
int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot;
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
SRspResultInfo resultInfo;
} SQueryRuntimeEnv;
enum {
......
......@@ -209,9 +209,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
// setup the output buffer
// TODO prepare the output buffer dynamically
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
// setup the output buffer for each operator
static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput;
......@@ -221,7 +220,7 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
idata.info.type = pExpr[i].type;
idata.info.bytes = pExpr[i].bytes;
idata.info.colId = pExpr[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes);
idata.pData = calloc(numOfRows, idata.info.bytes);
taosArrayPush(res->pDataBlock, &idata);
}
......@@ -862,9 +861,6 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num;
}
static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock);
static void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin,
int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -995,6 +991,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow)
return ekey;
}
#if 0
//todo binary search
static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock);
......@@ -1009,7 +1006,6 @@ static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
return NULL;
}
// todo refactor
static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) {
if (pDataBlock == NULL) {
......@@ -1057,6 +1053,7 @@ static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, i
return dataBlock;
}
#endif
static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
if (type == RESULT_ROW_START_INTERP) {
......@@ -1086,13 +1083,14 @@ static UNUSED_FUNC void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDat
}
}
static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const TSKEY* tsCols, int32_t step) {
static TSKEY getStartTsKey(SQuery* pQuery, STimeWindow* win, const TSKEY* tsCols, int32_t rows) {
TSKEY ts = TSKEY_INITIAL_VAL;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
if (tsCols == NULL) {
ts = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.skey : pDataBlockInfo->window.ekey;
ts = ascQuery? win->skey : win->ekey;
} else {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
int32_t offset = ascQuery? 0:rows-1;
ts = tsCols[offset];
}
......@@ -1216,10 +1214,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1);
int32_t startPos = pQuery->pos;
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step);
int32_t startPos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery);
bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false;
......@@ -1234,7 +1230,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t forwardStep = 0;
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true);
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
......@@ -1313,9 +1309,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
int16_t type = pColInfoData->info.type;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, 1);
char *val = pColInfoData->pData + bytes * offset;
char *val = pColInfoData->pData + bytes * j;
if (isNull(val, type)) { // ignore the null value
continue;
}
......@@ -1330,7 +1324,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here
int32_t functionId = pInfo->binfo.pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], offset);
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j);
}
}
}
......@@ -1886,10 +1880,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// update the number of output result
if (numOfRes > 0 && pQuery->checkResultBuf == 1) {
assert(numOfRes >= pQuery->rec.rows);
pQuery->rec.rows = numOfRes;
assert(numOfRes >= pRuntimeEnv->resultInfo.rows);
pRuntimeEnv->resultInfo.rows = numOfRes;
if (numOfRes >= pQuery->rec.threshold) {
if (numOfRes >= pRuntimeEnv->resultInfo.threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
......@@ -1954,7 +1948,7 @@ void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inpu
pCtx->preAggVals.dataBlockLoaded = (inputData != NULL);
// limit/offset query will affect this value
pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
// pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
// set the start position in current block
// int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
......@@ -2139,6 +2133,8 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) {
qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -2148,6 +2144,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
calResultBufSize(pQuery, &pRuntimeEnv->resultInfo);
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
......@@ -2177,11 +2175,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
......@@ -2194,20 +2192,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
} else if (pQuery->groupbyColumn) {
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (isFixedOutputQuery(pQuery)) {
if (!pQuery->stableQuery) {
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
} else {
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
}
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
......@@ -2215,8 +2213,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot);
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot);
}
}
......@@ -2261,14 +2259,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
qDebug("QInfo:%p teardown runtime env", pQInfo);
if (isTsCompQuery(pQuery)) {
FILE *f = *(FILE **)pQuery->sdata[0]->data;
if (f) {
fclose(f);
*(FILE **)pQuery->sdata[0]->data = NULL;
}
}
// if (isTsCompQuery(pQuery)) {
// FILE *f = *(FILE **)pQuery->sdata[0]->data;
//
// if (f) {
// fclose(f);
// *(FILE **)pQuery->sdata[0]->data = NULL;
// }
// }
if (pRuntimeEnv->sasArray != NULL) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -3510,7 +3508,7 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
}
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity));
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pRuntimeEnv->resultInfo.capacity));
}
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
......@@ -3633,16 +3631,16 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) {
void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->rec.rows == 0 || pQuery->limit.offset == 0) {
if (pRuntimeEnv->resultInfo.rows == 0 || pQuery->limit.offset == 0) {
return;
}
if (pQuery->rec.rows <= pQuery->limit.offset) {
qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pQuery->rec.rows,
pQuery->limit.offset - pQuery->rec.rows);
if (pRuntimeEnv->resultInfo.rows <= pQuery->limit.offset) {
qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pRuntimeEnv->resultInfo.rows,
pQuery->limit.offset - pRuntimeEnv->resultInfo.rows);
pQuery->limit.offset -= pQuery->rec.rows;
pQuery->rec.rows = 0;
pQuery->limit.offset -= pRuntimeEnv->resultInfo.rows;
pRuntimeEnv->resultInfo.rows = 0;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
......@@ -3650,25 +3648,25 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL);
} else {
int64_t numOfSkip = pQuery->limit.offset;
pQuery->rec.rows -= numOfSkip;
pRuntimeEnv->resultInfo.rows -= numOfSkip;
pQuery->limit.offset = 0;
qDebug("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip,
0, pQuery->rec.rows);
0, pRuntimeEnv->resultInfo.rows);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pQuery->rec.rows * bytes));
pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pQuery->rec.rows * bytes;
memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pRuntimeEnv->resultInfo.rows * bytes));
pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pRuntimeEnv->resultInfo.rows * bytes;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
}
}
updateNumOfResult(pRuntimeEnv, (int32_t)pQuery->rec.rows);
updateNumOfResult(pRuntimeEnv, (int32_t)pRuntimeEnv->resultInfo.rows);
}
}
......@@ -4428,7 +4426,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
//
// int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
// int32_t numOfResult = pQuery->rec.rows; // there are already exists result rows
// int32_t numOfResult = pRuntimeEnv->resultInfo.rows; // there are already exists result rows
//
// int32_t start = 0;
// int32_t step = -1;
......@@ -4452,8 +4450,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
// int32_t numOfRowsToCopy = pRow->numOfRows;
//
// //current output space is not enough to accommodate all data of this page, prepare more space
// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) {
// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult);
// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) {
// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult);
// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv));
// }
//
......@@ -4469,7 +4467,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
// }
//
// numOfResult += numOfRowsToCopy;
// if (numOfResult == pQuery->rec.capacity) { // output buffer is full
// if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full
// break;
// }
// }
......@@ -4516,8 +4514,8 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
int32_t numOfRowsToCopy = pRow->numOfRows;
//current output space is not enough to accommodate all data of this page, prepare more space
// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) {
// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult);
// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) {
// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult);
// expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv));
// }
......@@ -4538,7 +4536,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
}
numOfResult += numOfRowsToCopy;
if (numOfResult == pQuery->rec.capacity) { // output buffer is full
if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full
break;
}
}
......@@ -4599,7 +4597,7 @@ static UNUSED_FUNC void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEn
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo * pResultRowInfo = &pTableQueryInfo->resInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
// pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
// if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) {
// rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
......@@ -4620,7 +4618,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
return false;
}
if (pQuery->limit.limit > 0 && pQuery->rec.total >= pQuery->limit.limit) {
if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) {
return false;
}
......@@ -4639,7 +4637,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
* set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
* first result row in the actual result set will fill nothing.
*/
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity);
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity);
return numOfTotal > 0;
} else { // there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) &&
......@@ -4717,7 +4715,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
while (1) {
int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pQuery->rec.capacity);
int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pRuntimeEnv->resultInfo.capacity);
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */
......@@ -4748,7 +4746,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
}
// no data in current data after fill
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pQuery->rec.capacity);
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pRuntimeEnv->resultInfo.capacity);
if (numOfTotal == 0) {
return 0;
}
......@@ -4757,7 +4755,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
#endif
int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
......@@ -4766,7 +4763,7 @@ int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutp
p[i] = pColInfoData->pData;
}
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity);
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pRuntimeEnv->resultInfo.capacity);
tfree(p);
return pOutput->info.rows;
}
......@@ -5207,9 +5204,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
if (onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput);
} else if (needReverseScan(pQuery)) {
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
pRuntimeEnv->proot = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
pRuntimeEnv->pi = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
}
if (pTsBuf != NULL) {
......@@ -5252,7 +5249,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
int32_t numOfCols = getNumOfFinalResCol(pQuery);
pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, numOfCols,
pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfCols,
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
pQuery->fillType, pColInfo, pQInfo);
}
......@@ -5561,7 +5558,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
if (numOfRes > 0) {
pQuery->rec.rows += numOfRes;
pRuntimeEnv->resultInfo.rows += numOfRes;
forwardCtxOutputBuf(pRuntimeEnv, numOfRes);
}
......@@ -5571,7 +5568,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
// enable execution for next table, when handling the projection query
enableExecutionForNextTable(pRuntimeEnv);
if (pQuery->rec.rows >= pQuery->rec.capacity) {
if (pRuntimeEnv->resultInfo.rows >= pRuntimeEnv->resultInfo.capacity) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
break;
}
......@@ -5636,14 +5633,14 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size,
pRuntimeEnv->groupIndex);
pQuery->rec.rows = 0;
if (pWindowResInfo->size > pQuery->rec.capacity) {
pRuntimeEnv->resultInfo.rows = 0;
if (pWindowResInfo->size > pRuntimeEnv->resultInfo.capacity) {
expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo);
}
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pRuntimeEnv, pWindowResInfo);
assert(pQuery->rec.rows == pWindowResInfo->size);
assert(pRuntimeEnv->resultInfo.rows == pWindowResInfo->size);
resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
cleanupGroupResInfo(&pRuntimeEnv->groupResInfo);
......@@ -5709,7 +5706,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
}
// it is a super table ordered projection query, check for the number of output for each vgroup
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) {
if (pQuery->prjInfo.vgroupLimit > 0 && pRuntimeEnv->resultInfo.rows >= pQuery->prjInfo.vgroupLimit) {
if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
......@@ -5748,9 +5745,9 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes,
pQuery->current->lastKey);
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
pRuntimeEnv->resultInfo.rows = getNumOfResult(pRuntimeEnv);
int64_t inc = pQuery->rec.rows - prev;
int64_t inc = pRuntimeEnv->resultInfo.rows - prev;
pQuery->current->resInfo.size += (int32_t) inc;
// the flag may be set by tableApplyFunctionsOnBlock, clear it here
......@@ -5759,7 +5756,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
if (pQuery->prjInfo.vgroupLimit >= 0) {
if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
if (((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) < pQuery->prjInfo.vgroupLimit) || ((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) {
pQuery->prjInfo.ts = blockInfo.window.ekey;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) {
......@@ -5794,9 +5791,9 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
*/
if (hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
pQuery->rec.total += pQuery->rec.rows;
pRuntimeEnv->resultInfo.total += pRuntimeEnv->resultInfo.rows;
if (pQuery->rec.rows > 0) {
if (pRuntimeEnv->resultInfo.rows > 0) {
return;
}
}
......@@ -5865,7 +5862,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
} else {
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->rec.rows == 0) {
if (pRuntimeEnv->resultInfo.rows == 0) {
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
continue;
} else {
......@@ -5901,8 +5898,8 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64
" points returned, total:%" PRId64 ", offset:%" PRId64,
pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pQuery->rec.rows,
pQuery->rec.total, pQuery->limit.offset);
pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pRuntimeEnv->resultInfo.rows,
pRuntimeEnv->resultInfo.total, pQuery->limit.offset);
}
}
......@@ -5985,7 +5982,7 @@ static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
}
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total);
return;
}
......@@ -6034,7 +6031,7 @@ static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) {
}
// handle the limitation of output buffer
qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total + pRuntimeEnv->resultInfo.rows);
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
......@@ -6153,7 +6150,6 @@ static SSDataBlock* doTableScan(void* param) {
pTableScanInfo->order = cond.order;
// todo refactor, extract function
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
pResultRowInfo->curIndex = pResultRowInfo->size-1;
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey;
......@@ -6274,7 +6270,6 @@ static SSDataBlock* doAggregate(void* param) {
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6327,7 +6322,6 @@ static SSDataBlock* doSTableAggregate(void* param) {
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6371,7 +6365,6 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SArithOperatorInfo* pArithInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
pRuntimeEnv->pQuery->pos = 0;
pArithInfo->binfo.pRes->info.rows = 0;
while(1) {
......@@ -6494,7 +6487,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
STimeWindow win = pQuery->window;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6548,7 +6540,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6599,7 +6590,6 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
}
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6703,7 +6693,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6760,7 +6750,7 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1);
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6784,10 +6774,10 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
int64_t seed = rand();
pInfo->bufCapacity = 4096;
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
......@@ -6848,7 +6838,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6872,7 +6862,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6896,7 +6886,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6918,7 +6908,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6945,7 +6935,6 @@ static SSDataBlock* doTagScan(void* param) {
STagScanInfo *pTagScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
......@@ -6978,7 +6967,7 @@ static SSDataBlock* doTagScan(void* param) {
}
}
while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) {
while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) {
int32_t i = pRuntimeEnv->tableIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i);
......@@ -7021,7 +7010,7 @@ static SSDataBlock* doTagScan(void* param) {
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count);
} else*/ { // return only the tags|table name etc.
count = 0;
int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity;
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
SExprInfo* pExprInfo = pOperator->pExpr;
while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) {
......@@ -7063,7 +7052,7 @@ static SSDataBlock* doTagScan(void* param) {
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTagScanOp";
......@@ -7093,10 +7082,8 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0;
}
void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -7108,29 +7095,29 @@ void tableQueryImpl(SQInfo *pQInfo) {
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
if (pQuery->rec.rows > 0) {
// pRuntimeEnv->resultInfo.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
if (pRuntimeEnv->resultInfo.rows > 0) {
limitOperator(pQuery, pQInfo);
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total);
} else {
return copyAndFillResult(pQInfo);
}
} else {
pQuery->rec.rows = 0;
pRuntimeEnv->resultInfo.rows = 0;
assert(pRuntimeEnv->resultRowInfo.size > 0);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
if (pQuery->rec.rows > 0) {
if (pRuntimeEnv->resultInfo.rows > 0) {
limitOperator(pQuery, pQInfo);
}
if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pQuery->rec.rows,
pQuery->rec.total);
if (pRuntimeEnv->resultInfo.rows > 0) {
qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pRuntimeEnv->resultInfo.rows,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total);
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pRuntimeEnv->resultInfo.total);
}
}
......@@ -7139,7 +7126,6 @@ void tableQueryImpl(SQInfo *pQInfo) {
#endif
// number of points returned during this query
pQuery->rec.rows = 0;
int64_t st = taosGetTimestampUs();
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
......@@ -7149,7 +7135,6 @@ void tableQueryImpl(SQInfo *pQInfo) {
pQuery->current = item;
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0;
// record the total elapsed time
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
......@@ -7159,7 +7144,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
void buildTableBlockDistResult(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->pos = 0;
// pQuery->pos = 0;
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo));
......@@ -7199,19 +7184,18 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
type = blockDistSchema.type;
}
assert(type == TSDB_DATA_TYPE_BINARY);
STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
// STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
}
freeTableBlockDist(pTableBlockDist);
pQuery->rec.rows = 1;
// pRuntimeEnv->resultInfo.rows = 1;
setQueryStatus(pQuery, QUERY_COMPLETED);
}
void stableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->rec.rows = 0;
// pRuntimeEnv->resultInfo.rows = 0;
int64_t st = taosGetTimestampUs();
......@@ -7219,7 +7203,6 @@ void stableQueryImpl(SQInfo *pQInfo) {
// (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
//multiTableQueryProcess(pQInfo);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf != NULL? pRuntimeEnv->outputBuf->info.rows:0;
// } else {
// assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn);
// sequentialTableProcess(pQInfo);
......@@ -7971,7 +7954,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
static void calResultBufSize(SQuery* pQuery) {
static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) {
const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes
const int32_t RESULT_MSG_MIN_ROWS = 8192;
const float RESULT_THRESHOLD_RATIO = 0.85f;
......@@ -7982,11 +7965,11 @@ static void calResultBufSize(SQuery* pQuery) {
numOfRes = RESULT_MSG_MIN_ROWS;
}
pQuery->rec.capacity = numOfRes;
pQuery->rec.threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO);
pResultInfo->capacity = numOfRes;
pResultInfo->threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO);
} else { // in case of non-prj query, a smaller output buffer will be used.
pQuery->rec.capacity = 4096;
pQuery->rec.threshold = (int32_t)(pQuery->rec.capacity * RESULT_THRESHOLD_RATIO);
pResultInfo->capacity = 4096;
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * RESULT_THRESHOLD_RATIO);
}
}
......@@ -8058,29 +8041,27 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
}
// prepare the result buffer
pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES);
if (pQuery->sdata == NULL) {
goto _cleanup;
}
calResultBufSize(pQuery);
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
// allocate additional memory for interResults that are usually larger then final results
// TODO refactor
int16_t bytes = 0;
if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) {
bytes = pExprs[col].bytes;
} else {
bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes);
}
// pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES);
// if (pQuery->sdata == NULL) {
// goto _cleanup;
// }
size_t size = (size_t)((pQuery->rec.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage));
pQuery->sdata[col] = (tFilePage *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _cleanup;
}
}
// for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
// // allocate additional memory for interResults that are usually larger then final results
// // TODO refactor
// int16_t bytes = 0;
// if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) {
// bytes = pExprs[col].bytes;
// } else {
// bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes);
// }
//
// size_t size = (size_t)((pRuntimeEnv->resultInfo.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage));
// pQuery->sdata[col] = (tFilePage *)calloc(1, size);
// if (pQuery->sdata[col] == NULL) {
// goto _cleanup;
// }
// }
if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
......@@ -8115,7 +8096,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
pthread_mutex_init(&pQInfo->lock, NULL);
tsem_init(&pQInfo->ready, 0, 0);
pQuery->pos = -1;
pQuery->window = pQueryMsg->window;
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
......@@ -8326,13 +8306,6 @@ void freeQInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (pQuery != NULL) {
if (pQuery->sdata != NULL) {
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
tfree(pQuery->sdata[col]);
}
tfree(pQuery->sdata);
}
if (pQuery->fillVal != NULL) {
tfree(pQuery->fillVal);
}
......@@ -8391,7 +8364,7 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
*/
if (isTsCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fStat;
FILE *f = *(FILE **)pQuery->sdata[0]->data;
FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data;
if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) {
*numOfRows = fStat.st_size;
return fStat.st_size;
......@@ -8406,12 +8379,13 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// the remained number of retrieved rows, not the interpolated result
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// load data from file to msg buffer
if (isTsCompQuery(pQuery)) {
FILE *f = *(FILE **)pQuery->sdata[0]->data; // TODO refactor
FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; // TODO refactor
// make sure file exist
if (f) {
......@@ -8437,7 +8411,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
}
fclose(f);
*(FILE **)pQuery->sdata[0]->data = NULL;
// *(FILE **)pQuery->sdata[0]->data = NULL;
}
// all data returned, set query over
......@@ -8445,13 +8419,14 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
setQueryStatus(pQuery, QUERY_OVER);
}
} else {
doCopyQueryResultToMsg(pQInfo, (int32_t)pQuery->rec.rows, data);
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
}
pQuery->rec.total += pQuery->rec.rows;
qDebug("QInfo:%p current numOfRes rows:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows;
qDebug("QInfo:%p current numOfRes rows:%d, total:%" PRId64, pQInfo,
pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
if (pQuery->limit.limit > 0 && pQuery->limit.limit == pQuery->rec.total) {
if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) {
qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit);
setQueryStatus(pQuery, QUERY_OVER);
}
......@@ -8494,7 +8469,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
......@@ -8504,9 +8478,9 @@ void buildTagQueryResult(SQInfo* pQInfo) {
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0;
return;
#if 0
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
size_t num = taosArrayGetSize(pa);
......@@ -8532,11 +8506,11 @@ void buildTagQueryResult(SQInfo* pQInfo) {
}
}
while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) {
while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) {
int32_t i = pRuntimeEnv->tableIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pQuery->sdata[0]->data + count * rsize;
char *output = NULL;//pQuery->sdata[0]->data + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
......@@ -8568,7 +8542,7 @@ void buildTagQueryResult(SQInfo* pQInfo) {
qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count);
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
*(int64_t*) pQuery->sdata[0]->data = num;
// *(int64_t*) pQuery->sdata[0]->data = num;
count = 1;
SET_STABLE_QUERY_OVER(pRuntimeEnv);
......@@ -8577,8 +8551,8 @@ void buildTagQueryResult(SQInfo* pQInfo) {
count = 0;
SSchema* tbnameSchema = tGetTbnameColumnSchema();
int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity;
if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) {
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pRuntimeEnv->resultInfo.capacity) {
maxNumOfTables = (int32_t)pQuery->limit.limit;
}
......@@ -8607,13 +8581,13 @@ void buildTagQueryResult(SQInfo* pQInfo) {
type = tbnameSchema->type;
data = tsdbGetTableName(item->pTable);
dst = pQuery->sdata[j]->data + count * tbnameSchema->bytes;
// dst = pQuery->sdata[j]->data + count * tbnameSchema->bytes;
} else {
type = pExprInfo[j].type;
bytes = pExprInfo[j].bytes;
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
// dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
}
......@@ -8625,8 +8599,9 @@ void buildTagQueryResult(SQInfo* pQInfo) {
qDebug("QInfo:%p create tag values results completed, rows:%d", pQInfo, count);
}
pQuery->rec.rows = count;
pRuntimeEnv->resultInfo.rows = count;
setQueryStatus(pQuery, QUERY_COMPLETED);
#endif
}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
......
......@@ -244,14 +244,15 @@ bool qTableQuery(qinfo_t qinfo) {
tableQueryImpl(pQInfo);
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else if (pRuntimeEnv->outputBuf->info.rows == 0) {
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pRuntimeEnv->outputBuf->info.rows,
pRuntimeEnv->resultInfo.total + pRuntimeEnv->outputBuf->info.rows);
}
return doBuildResCheck(pQInfo);
......@@ -279,6 +280,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
*buildRes = true;
code = pQInfo->code;
} else {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock);
......@@ -286,8 +288,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->resultRowSize,
pQuery->rec.rows, tstrerror(pQInfo->code));
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize,
pRuntimeEnv->outputBuf->info.rows, tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
......@@ -310,7 +312,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
int64_t s = pRuntimeEnv->outputBuf->info.rows;
size_t size = getResultSize(pQInfo, &s);
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
......@@ -323,7 +328,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
(*pRsp)->numOfRows = htonl((int32_t)s);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset);
......@@ -334,7 +339,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->precision = htons(pQuery->precision);
if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
if (pQInfo->runtimeEnv.outputBuf->info.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
setQueryStatus(pQuery, QUERY_OVER);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册