提交 847c2823 编写于 作者: H Haojun Liao

[TD-225] fix bugs in stddev query processing.

上级 be19f211
......@@ -2970,7 +2970,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
STableMeta* pTableMeta = NULL;
SSchema* pSchema = NULL;
// SSchema s = tGetTbnameColumnSchema();
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
......
......@@ -1878,14 +1878,31 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
}
}
static void destroySup(SFirstRoundQuerySup* pSup) {
taosArrayDestroyEx(pSup->pResult, freeInterResult);
taosArrayDestroy(pSup->pColsInfo);
tfree(pSup);
}
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*)tres;
SSqlRes* pRes = &pSql->res;
SFirstRoundQuerySup* pSup = param;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (numOfRows > 0) {
SSqlObj* pParent = pSup->pParent;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
destroySup(pSup);
taos_free_result(pSql);
pParent->res.code = code;
tscAsyncResultOnError(pParent);
return;
}
if (numOfRows > 0) { // the number is not correct for group by column in super table query
TAOS_ROW row = NULL;
int32_t numOfCols = taos_field_count(tres);
......@@ -1895,6 +1912,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
while ((row = taos_fetch_row(tres)) != NULL) {
doAppendData(&interResult, row, numOfCols, pQueryInfo);
pSup->numOfRows += 1;
}
} else { // tagLen > 0
char* p = calloc(1, pSup->tagLen);
......@@ -1906,7 +1924,9 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
// tag or group by column
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) {
memcpy(p + offset, row[i], length[i]);
offset += pExpr->resBytes;
}
......@@ -1935,20 +1955,20 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taosArrayPush(pSup->pResult, &interResult);
doAppendData(&interResult, row, numOfCols, pQueryInfo);
}
pSup->numOfRows += 1;
}
tfree(p);
}
}
pSup->numOfRows += numOfRows;
if (!pRes->completed) {
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
return;
}
// set the parameters for the second round query process
SSqlObj *pParent = pSup->pParent;
SSqlCmd *pPCmd = &pParent->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
......@@ -1974,9 +1994,19 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
}
void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
int32_t c = taos_errno(tres);
SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param;
SSqlObj* pSql = (SSqlObj*) tres;
int32_t c = taos_errno(pSql);
if (c != TSDB_CODE_SUCCESS) {
// TODO HANDLE ERROR
SSqlObj* parent = pSup->pParent;
destroySup(pSup);
taos_free_result(pSql);
parent->res.code = code;
tscAsyncResultOnError(parent);
return;
}
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
......@@ -2010,13 +2040,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
goto _error;
}
}
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
goto _error;
}
pNewQueryInfo->interval = pQueryInfo->interval;
......@@ -2027,7 +2057,6 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t index = 0;
int32_t numOfTags = 0;
for(int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
......@@ -2060,7 +2089,25 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
p->resColId = pExpr->resColId;
numOfTags += 1;
} else if (pExpr->functionId == TSDB_FUNC_PRJ) {
int32_t num = taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
for(int32_t k = 0; k < num; ++k) {
SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k);
if (pExpr->colInfo.colId == pIndex->colId) {
pSup->tagLen += pExpr->resBytes;
taosArrayPush(pSup->pColsInfo, &pExpr->resColId);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex};
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
//doLimitOutputNormalColOfGroupby
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL);
p->numOfParams = 1;
p->param[0].i64 = 1;
p->param[0].nType = TSDB_DATA_TYPE_INT;
p->resColId = pExpr->resColId; // update the result column id
}
}
}
}
......@@ -2077,6 +2124,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
tscHandleMasterSTableQuery(pNew);
return TSDB_CODE_SUCCESS;
_error:
destroySup(pSup);
taos_free_result(pNew);
pSql->res.code = terrno;
tscAsyncResultOnError(pSql);
return terrno;
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
......@@ -2118,7 +2172,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tfree(pMemoryBuf);
return ret;
}
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) {
......
......@@ -235,7 +235,8 @@ typedef struct SQueryRuntimeEnv {
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
......
......@@ -1630,6 +1630,97 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
}
static void stddev_dst_function_f(SQLFunctionCtx *pCtx, int32_t index) {
void *pData = GET_INPUT_DATA(pCtx, index);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return;
}
// the second stage to calculate standard deviation
SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double *retVal = &pStd->res;
// all data are null, no need to proceed
SArray* resList = (SArray*) pCtx->param[0].pz;
if (resList == NULL) {
return;
}
// find the correct group average results according to the tag value
int32_t len = (int32_t) taosArrayGetSize(resList);
assert(len > 0);
double avg = 0;
if (len == 1) {
SResPair* p = taosArrayGet(resList, 0);
avg = p->avg;
} else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result
SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare);
assert(p != NULL);
avg = p->avg;
}
int32_t num = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < pCtx->size; ++i) {
if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) {
continue;
}
num += 1;
*retVal += POW2(((int32_t *)pData)[i] - avg);
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_TINYINT: {
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_UINT: {
LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
break;
}
default:
qError("stddev function not support data type:%d", pCtx->inputType);
}
pStd->num += num;
SET_VAL(pCtx, num, 1);
// copy to the final output buffer for super table
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
}
static void stddev_dst_merge(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -4835,7 +4926,7 @@ SAggFunctionInfo aAggs[] = {{
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
function_setup,
stddev_dst_function,
noop2,
stddev_dst_function_f,
no_next_step,
stddev_dst_finalizer,
stddev_dst_merge,
......
......@@ -280,6 +280,17 @@ bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
return false;
}
bool isStabledev(SQuery* pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pExpr1[i].base.functionId;
if (functId == TSDB_FUNC_STDDEV_DST) {
return true;
}
}
return false;
}
int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
assert(pGroupbyExpr != NULL);
......@@ -1637,8 +1648,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
pWindowResInfo->curIndex = index;
} else { // other queries
// decide which group this rows belongs to according to current state value
char* val = NULL;
if (groupbyColumnValue) {
char *val = groupbyColumnData + bytes * offset;
val = groupbyColumnData + bytes * offset;
if (isNull(val, type)) { // ignore the null value
continue;
}
......@@ -1649,6 +1661,34 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
if (pRuntimeEnv->stabledev) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
pRuntimeEnv->pCtx[k].param[0].arr = NULL;
pRuntimeEnv->pCtx[k].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// todo opt perf
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t i = 0; i < numOfGroup; ++i) {
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, i);
if (memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t f = 0; f < numOfCols; ++f) {
SStddevInterResult *pres = taosArrayGet(p->pResult, f);
if (pres->colId == pQuery->pExpr1[k].base.colInfo.colId) {
pRuntimeEnv->pCtx[k].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
......@@ -3799,7 +3839,7 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf
int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->prevResult == NULL) {
if (pRuntimeEnv->prevResult == NULL || pRuntimeEnv->groupbyColumn) {
return TSDB_CODE_SUCCESS;
}
......@@ -4602,6 +4642,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
pRuntimeEnv->stabledev = isStabledev(pQuery);
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
......@@ -4701,13 +4742,6 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
setTimestampListJoinInfo(pQInfo, pTableQueryInfo);
}
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
setParamValue(pRuntimeEnv);
break;
}
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
setIntervalQueryRange(pQInfo, pBlockInfo->window.skey);
} else { // non-interval query
......@@ -4761,6 +4795,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
}
if (pRuntimeEnv->stabledev) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
setParamValue(pRuntimeEnv);
break;
}
}
}
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册