提交 b0cabda1 编写于 作者: H Haojun Liao

[td-2895] refactor

上级 232a4415
......@@ -411,6 +411,7 @@ typedef struct SHashGroupbyOperatorInfo {
SQLFunctionCtx *pCtx;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
int32_t colIndex;
} SHashGroupbyOperatorInfo;
void freeParam(SQueryParam *param);
......
......@@ -91,6 +91,6 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv);
#endif // TDENGINE_QUERYUTIL_H
......@@ -154,6 +154,8 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
}
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols);
static void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
......@@ -188,11 +190,16 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo,
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock);
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock);
static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
......@@ -300,6 +307,33 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
return maxOutput;
}
int64_t getNumOfResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool hasMainFunction = hasMainOutput(pQuery);
int64_t maxOutput = 0;
for (int32_t j = 0; j < numOfOutput; ++j) {
int32_t functionId = pCtx[j].functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (hasMainFunction &&
(functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
continue;
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
}
}
assert(maxOutput >= 0);
return maxOutput;
}
/*
* the value of number of result needs to be update due to offset value upated.
*/
......@@ -1302,11 +1336,9 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
}
}
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SResultRowInfo*pResultRowInfo = &pRuntimeEnv->resultRowInfo;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
......@@ -1389,32 +1421,32 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera
}
}
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SResultRowInfo *pResultRowInfo, SQLFunctionCtx *pCtx,
SSDataBlock *pSDataBlock, int32_t colIndex) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
SDataBlockInfo* pBlockInfo = &pSDataBlock->info;
int16_t type = 0;
int16_t bytes = 0;
char* groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pSDataBlock->pDataBlock);
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
for (int32_t j = 0; j < pBlockInfo->rows; ++j) {
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, 1);
char *val = groupbyColumnData + bytes * offset;
char *val = pColInfoData->pData + bytes * offset;
if (isNull(val, type)) { // ignore the null value
continue;
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex);
// TODO compare with the previous value to speedup the query processing
int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, pResultRowInfo, pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
......@@ -1545,7 +1577,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
......@@ -1560,7 +1592,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, d, len, true, groupIndex);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
int64_t v = -1;
......@@ -1588,6 +1620,50 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
return TSDB_CODE_SUCCESS;
}
static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData;
int16_t len = bytes;
if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) {
d = varDataVal(pData);
len = varDataLen(pData);
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo);
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (pResultRow->key == NULL) {
pResultRow->key = malloc(varDataTLen(pData));
varDataCopy(pResultRow->key, pData);
} else {
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
}
} else {
pResultRow->win.skey = v;
pResultRow->win.ekey = v;
}
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
}
}
setResultOutputBuf_rv(pRuntimeEnv, pResultRow, pCtx, numOfCols);
initCtxOutputBuf_rv(pCtx, numOfCols);
return TSDB_CODE_SUCCESS;
}
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) {
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
......@@ -1612,6 +1688,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
*type = pQuery->colList[colIndex].type;
*bytes = pQuery->colList[colIndex].bytes;
/*
* the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare
* stage, the remain tables may not have the required column in cache actually. So, the validation of required
......@@ -1630,6 +1707,26 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
return NULL;
}
static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock) {
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
continue;
}
int32_t colId = pColIndex->colId;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
if (pColInfo->info.colId == colId) {
return i;
}
}
}
assert(0);
}
static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -1931,7 +2028,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
continue;
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex);
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -2356,25 +2453,38 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
if (!pQuery->stableQuery) { // interval (down sampling operation)
if (isFixedOutputQuery(pQuery)) {
pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
// if (!pQuery->stableQuery) { // interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->stableQuery) {
pRuntimeEnv->proot = createStableIntervalOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
} else {
pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
}
} else if (pQuery->groupbyColumn) {
pRuntimeEnv->proot = createHashGroupbyAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
} else if (isFixedOutputQuery(pQuery)) {
pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
......@@ -2387,7 +2497,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if (pQuery->limit.limit > 0) {
pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
}
// }
return TSDB_CODE_SUCCESS;
......@@ -3053,11 +3163,97 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
return TSDB_CODE_SUCCESS;
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo,
void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) {
void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = calloc(numOfRows, sizeof(int8_t));
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
for (int32_t k = 0; k < numOfFilterCols; ++k) {
char *pElem = (char *)pFilterInfo[k].pData + pFilterInfo[k].info.bytes * i;
qualified = false;
for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) {
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
bool isnull = isNull(pElem, pFilterInfo[k].info.type);
if (isnull) {
if (pFilterElem->fp == isNullOperator) {
qualified = true;
break;
} else {
continue;
}
} else {
if (pFilterElem->fp == notNullOperator) {
qualified = true;
break;
} else if (pFilterElem->fp == isNullOperator) {
continue;
}
}
if (pFilterElem->fp(pFilterElem, pElem, pElem, pFilterInfo[k].info.type)) {
qualified = true;
break;
}
}
if (!qualified) {
break;
}
}
p[i] = qualified ? 1 : 0;
if (!qualified) {
all = false;
}
}
if (!all) {
int32_t start = 0;
int32_t len = 0;
for (int32_t j = 0; j < numOfRows; ++j) {
if (p[j] == 1) {
len++;
} else {
if (len > 0) {
int32_t cstart = j - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
}
start += len;
}
}
}
pBlock->info.rows = start;
pBlock->pBlockStatis = NULL; // clean the block statistics info
if (start > 0) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0);
assert(pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
pBlock->info.window.skey = *(int64_t *)pColumnInfoData->pData;
pBlock->info.window.ekey = *(int64_t *)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
}
}
tfree(p);
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRowInfo *pWindowResInfo,
void *pQueryHandle, SSDataBlock *pBlock, uint32_t *status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -3158,6 +3354,23 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx*
if (pBlock->pDataBlock == NULL) {
return terrno;
}
if (pQuery->numOfFilterCols > 0) {
if (pQuery->pFilterInfo[0].pData == NULL) {
for(int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j);
if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) {
pQuery->pFilterInfo[i].pData = pColInfo->pData;
break;
}
}
}
}
filterDataBlock_rv(pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock);
}
}
return TSDB_CODE_SUCCESS;
......@@ -3478,6 +3691,64 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable) {
}
}
}
void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
} else {
// set tag value, by which the results are aggregated.
int32_t offset = 0;
memset(pRuntimeEnv->tagVal, 0, pQuery->tagLen);
for (int32_t idx = 0; idx < numOfOutput; ++idx) {
SExprInfo* pLocalExprInfo = &pExpr[idx];
// ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
continue;
}
// todo use tag column index to optimize performance
doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, pLocalExprInfo->bytes);
if (IS_NUMERIC_TYPE(pLocalExprInfo->type) || pLocalExprInfo->type == TSDB_DATA_TYPE_BOOL) {
memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->bytes);
} else {
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pRuntimeEnv->pCtx[idx].tag.nLen);
}
offset += pLocalExprInfo->bytes;
}
// set the join tag for first column
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTsBuf != NULL &&
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(pFuncMsg->numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
doSetTagValueInParam(pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
int16_t tagType = pRuntimeEnv->pCtx[0].tag.nType;
if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo,
pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.pz);
} else {
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo,
pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64);
}
}
}
}
static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t srcDataType) {
if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
......@@ -3592,7 +3863,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
// all results in current group have been returned to client, try next group
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
assert(pGroupResInfo->index == 0);
if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pQInfo)) != TSDB_CODE_SUCCESS) {
if ((pQInfo->code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) {
return;
}
}
......@@ -3614,6 +3885,36 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
}
void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock) {
SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo;
int32_t code = TSDB_CODE_SUCCESS;
while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
// all results in current group have been returned to client, try next group
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
assert(pGroupResInfo->index == 0);
if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv)) != TSDB_CODE_SUCCESS) {
return;
}
}
doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, TSDB_ORDER_ASC, pBlock);
// current data are all dumped to result buffer, clear it
if (!hasRemainData(pGroupResInfo)) {
cleanupGroupResInfo(pGroupResInfo);
if (!incNextGroup(pGroupResInfo)) {
SET_STABLE_QUERY_OVER(pRuntimeEnv);
}
}
// enough results in data buffer, return
if (pBlock->info.rows >= threshold) {
break;
}
}
}
static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
......@@ -4216,7 +4517,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setResultOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pExpr1[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
aAggs[pRuntimeEnv->pCtx[j].functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
/*
......@@ -4233,6 +4534,40 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
if (pQuery->groupbyColumn) {
closeAllResultRows(pResultRowInfo);
}
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *buf = pResultRowInfo->pResult[i];
if (!isResultRowClosed(pResultRowInfo, i)) {
continue;
}
setResultOutputBuf_rv(pRuntimeEnv, buf, pCtx, numOfOutput);
for (int32_t j = 0; j < numOfOutput; ++j) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
}
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
buf->numOfRows = (uint16_t)getNumOfResult(pRuntimeEnv);
}
} else {
for (int32_t j = 0; j < numOfOutput; ++j) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
}
}
}
static bool hasMainOutput(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
......@@ -4281,8 +4616,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
void setExecutionContext(SQueryRuntimeEnv *pRuntimeEnv, int32_t groupIndex, TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
......@@ -4337,6 +4671,26 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
}
}
void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page);
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
/*
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx[i].resultInfo = getResultCell(pRuntimeEnv, pResult, i);
}
}
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -4458,8 +4812,7 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) {
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
*/
void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = pQuery->current;
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
......@@ -5210,9 +5563,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return TSDB_CODE_SUCCESS;
}
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
return TSDB_CODE_SUCCESS;
}
// if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
// return TSDB_CODE_SUCCESS;
// }
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
......@@ -5408,9 +5761,8 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
static FORCE_INLINE void setEnvForEachBlock(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
......@@ -5422,9 +5774,9 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
setIntervalQueryRange(pQInfo, pBlockInfo->window.skey);
setIntervalQueryRange(pRuntimeEnv, pBlockInfo->window.skey);
} else { // non-interval query
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
setExecutionContext(pRuntimeEnv, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
}
}
......@@ -5471,7 +5823,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
if (!pQuery->groupbyColumn) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
setEnvForEachBlock(pRuntimeEnv, *pTableQueryInfo, &blockInfo);
}
if (pQuery->stabledev) {
......@@ -5670,7 +6022,7 @@ static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) {
*
* @param pQInfo
*/
static void sequentialTableProcess(SQInfo *pQInfo) {
static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
......@@ -6133,7 +6485,7 @@ static void doCloseAllTimeWindow(SQInfo *pQInfo) {
}
}
static void multiTableQueryProcess(SQInfo *pQInfo) {
static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -6266,8 +6618,9 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
tfree(arithSup.data);
}
static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
SSDataBlock *pBlock = &pTableScanInfo->block;
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
......@@ -6275,6 +6628,17 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
// todo check for query cancel
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1) {
STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet(
pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid));
if (pTableQueryInfo == NULL) {
break;
}
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
}
// this function never returns error?
uint32_t status;
int32_t code =
......@@ -6303,7 +6667,7 @@ static SSDataBlock* doTableScan(void* param) {
SQuery* pQuery = pRuntimeEnv->pQuery;
while (pTableScanInfo->current < pTableScanInfo->times) {
SSDataBlock* p = doScanTableImpl(pTableScanInfo);
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
if (p != NULL) {
return p;
}
......@@ -6347,7 +6711,7 @@ static SSDataBlock* doTableScan(void* param) {
pTableScanInfo->reverseTimes = 0;
pTableScanInfo->order = cond.order;
SSDataBlock* p = doScanTableImpl(pTableScanInfo);
SSDataBlock* p = doTableScanImpl(pTableScanInfo);
if (p != NULL) {
return p;
}
......@@ -6388,11 +6752,21 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0){
SHashIntervalOperatorInfo* pIntervalInfo = pDownstream->optInfo;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
} else if (strcasecmp(name, "HashGroupbyAggOp") == 0) {
SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pGroupbyInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo;
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
} else {
assert(0);
}
......@@ -6467,9 +6841,12 @@ static SSDataBlock* doAggregation(void* param) {
pOperator->completed = true;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
pRes->info.rows = getNumOfResult(pRuntimeEnv);
if (!pQuery->stableQuery) {
finalizeQueryResult(pRuntimeEnv);
}
pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pCtx, pOperator->numOfOutput);
destroySQLFunctionCtx(pCtx, pRes->info.numOfCols);
return pRes;
......@@ -6501,6 +6878,8 @@ static SSDataBlock* doArithmeticOperation(void* param) {
break;
}
setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pArithInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pArithInfo->pCtx[i].size = pBlock->info.rows;
......@@ -6628,14 +7007,14 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order);
hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock);
hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pIntervalInfo->pCtx, pBlock);
}
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
......@@ -6645,7 +7024,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
return pIntervalInfo->pRes;
}
static SSDataBlock* doHashGroupbyAgg(void* param) {
static SSDataBlock* doSTableIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
......@@ -6656,6 +7035,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
......@@ -6663,8 +7043,11 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
return pIntervalInfo->pRes;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
......@@ -6672,17 +7055,25 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
break;
}
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->optInfo;
order = getTableScanOrder(pScanInfo);
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
hashGroupbyAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock);
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order);
setEnvForEachBlock(pRuntimeEnv, pRuntimeEnv->pQuery->current, &pBlock->info);
hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo->pCtx, pBlock);
}
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes);
// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
// toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
......@@ -6691,6 +7082,55 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
return pIntervalInfo->pRes;
}
static SSDataBlock* doHashGroupbyAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SHashGroupbyOperatorInfo *pInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pInfo->pRes;
}
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
if (pInfo->colIndex == -1) {
pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock);
}
hashGroupbyAgg(pRuntimeEnv, pOperator, &pInfo->resultRowInfo, pInfo->pCtx, pBlock, pInfo->colIndex);
}
closeAllResultRows(&pInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pInfo->pRes;
}
static SSDataBlock* doFill(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
......@@ -6858,16 +7298,42 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
return pOperator;
}
static UNUSED_FUNC SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "STableIntervalAggOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = doSTableIntervalAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
}
SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->colIndex = -1;
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashGroupbyOp";
pOperator->name = "HashGroupbyAggOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
......@@ -7161,13 +7627,15 @@ void stableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
multiTableQueryProcess(pQInfo);
} else {
assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn);
sequentialTableProcess(pQInfo);
}
// if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
// (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
//multiTableQueryProcess(pQInfo);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf != NULL? pRuntimeEnv->outputBuf->info.rows:0;
// } else {
// assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn);
// sequentialTableProcess(pQInfo);
// }
// record the total elapsed time
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
......
......@@ -438,7 +438,7 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
}
}
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, void* qinfo) {
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -456,7 +456,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%p failed alloc memory", qinfo);
qError("QInfo:%p failed alloc memory", pRuntimeEnv->qinfo);
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
......@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
int64_t endt = taosGetTimestampMs();
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", qinfo,
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pRuntimeEnv->qinfo,
pGroupResInfo->currentGroup, endt - startt);
_end:
......@@ -539,14 +539,13 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv) {
int64_t st = taosGetTimestampUs();
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, pQInfo);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -556,7 +555,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
break;
}
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pGroupResInfo->currentGroup);
qDebug("QInfo:%p no result in group %d, continue", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
......@@ -566,9 +565,9 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) {
}
int64_t elapsedTime = taosGetTimestampUs() - st;
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo,
qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo,
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
pQInfo->summary.firstStageMergeTime += elapsedTime;
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册