提交 5dbe9cc5 编写于 作者: H Haojun Liao

[td-2819] refactor codes.

上级 cdd7eed7
......@@ -200,6 +200,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t
static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
......@@ -1251,8 +1252,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
qError("QInfo:%p group by not supported on double/float columns, abort", pRuntimeEnv->qinfo);
......@@ -1273,6 +1276,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
......@@ -3319,7 +3326,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo1 = &(pExprInfo[i]);
if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
if (pExprInfo1->base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
......@@ -3347,6 +3354,36 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQuery->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr1 = &pExpr[i].base;
if (pExpr1->functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pExpr1->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
/*
* There are two cases to handle:
*
......@@ -3476,9 +3513,9 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
}
}
static void updateNumOfRowsInResultRows(SQueryRuntimeEnv *pRuntimeEnv,
SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// update the number of result for each, only update the number of rows for the corresponding window result.
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册