提交 d4915e95 编写于 作者: S shenglian zhou

select histogram(col) from childtable/super table support

上级 d4a509e5
...@@ -3331,6 +3331,58 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -3331,6 +3331,58 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
case TSDB_FUNC_HISTOGRAM: {
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
if (pParamElem->pNode->tokenId != TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
int32_t numBins = 4;
int16_t resultType = pSchema->type;
int32_t resultSize = pSchema->bytes;
int32_t interResult = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, numBins, &resultType, &resultSize, &interResult, 0, false,
pUdfInfo);
SExprInfo* pExpr = NULL;
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, (char*)&numBins, TSDB_DATA_TYPE_INT, sizeof(int32_t));
double* intervals = malloc((numBins + 1) * sizeof(double));
intervals[0] = -DBL_MAX;
intervals[1] = 10;
intervals[2] = 20;
intervals[3] = 30;
intervals[4] = DBL_MAX;
tscExprAddParams(&pExpr->base, (char*)intervals, TSDB_DATA_TYPE_BINARY, sizeof(double)*(numBins+1));
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1);
// todo refactor: tscColumnListInsert part
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr);
} else {
assert(ids.num == 1);
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
}
tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid);
return TSDB_CODE_SUCCESS;
}
default: { default: {
assert(!TSDB_FUNC_IS_SCALAR(functionId)); assert(!TSDB_FUNC_IS_SCALAR(functionId));
pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n); pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n);
...@@ -3703,7 +3755,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -3703,7 +3755,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) || (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) ||
(functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_ELAPSED)) { (functionId == TSDB_FUNC_SAMPLE) ||
(functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_HISTOGRAM)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
...@@ -6147,7 +6200,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo ...@@ -6147,7 +6200,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
const char* msg1 = "value is expected"; const char* msg1 = "value is expected";
const char* msg2 = "invalid fill option"; const char* msg2 = "invalid fill option";
const char* msg3 = "top/bottom/sample not support fill"; const char* msg3 = "top/bottom/sample/histogram not support fill";
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
const char* msg5 = "fill only available for interval query"; const char* msg5 = "fill only available for interval query";
const char* msg7 = "join query not supported fill operation"; const char* msg7 = "join query not supported fill operation";
...@@ -6257,7 +6310,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo ...@@ -6257,7 +6310,7 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
for(int32_t i = 0; i < numOfExprs; ++i) { for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM
|| pExpr->base.functionId == TSDB_FUNC_SAMPLE) { || pExpr->base.functionId == TSDB_FUNC_SAMPLE || pExpr->base.functionId == TSDB_FUNC_HISTOGRAM) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
} }
......
...@@ -688,7 +688,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { ...@@ -688,7 +688,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) || if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) ||
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_TS_COMP || functionId == TSDB_FUNC_TS_COMP ||
functionId == TSDB_FUNC_SAMPLE)) { functionId == TSDB_FUNC_SAMPLE ||
functionId == TSDB_FUNC_HISTOGRAM)) {
return true; return true;
} }
} }
......
...@@ -79,12 +79,7 @@ extern "C" { ...@@ -79,12 +79,7 @@ extern "C" {
#define TSDB_FUNC_ELAPSED 37 #define TSDB_FUNC_ELAPSED 37
#define TSDB_FUNC_HISTOGRAM 38 #define TSDB_FUNC_HISTOGRAM 38
/////////////////////////////////////////// #define TSDB_FUNC_MAX_NUM 39
// the following functions is not implemented.
// after implementation, move them before TSDB_FUNC_BLKINFO. also make TSDB_FUNC_BLKINFO the maxium function index
// #define TSDB_FUNC_HISTOGRAM 40
// #define TSDB_FUNC_HLL 41
// #define TSDB_FUNC_MODE 42
#define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
......
...@@ -388,6 +388,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -388,6 +388,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = sizeof(SElapsedInfo); *bytes = sizeof(SElapsedInfo);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_HISTOGRAM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (sizeof(SHistogramFuncInfo) + param * sizeof(SHistogramFuncBin));
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} }
} }
...@@ -492,6 +497,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -492,6 +497,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = tDataTypes[*type].bytes; *bytes = tDataTypes[*type].bytes;
*interBytes = sizeof(SElapsedInfo); *interBytes = sizeof(SElapsedInfo);
} else if (functionId == TSDB_FUNC_HISTOGRAM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = 1024;
*interBytes = *bytes;
} else { } else {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
...@@ -513,7 +522,7 @@ int32_t isValidFunction(const char* name, int32_t len) { ...@@ -513,7 +522,7 @@ int32_t isValidFunction(const char* name, int32_t len) {
} }
} }
for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) { for(int32_t i = 0; i < TSDB_FUNC_MAX_NUM; ++i) {
int32_t nameLen = (int32_t) strlen(aAggs[i].name); int32_t nameLen = (int32_t) strlen(aAggs[i].name);
if (len != nameLen) { if (len != nameLen) {
continue; continue;
...@@ -4955,8 +4964,9 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p ...@@ -4955,8 +4964,9 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p
return false; return false;
} }
double* listBin = (double*) pCtx->param[0].pz; double* listBin = (double*) pCtx->param[1].pz;
int32_t numOfBins = pCtx->param[0].nLen / sizeof(double) - 1; int32_t numOfBins = (int32_t)pCtx->param[0].i64;
pRes->numOfBins = numOfBins;
pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo)); pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo));
for (int32_t i = 0; i < numOfBins; ++i) { for (int32_t i = 0; i < numOfBins; ++i) {
pRes->orderedBins[i].lower = listBin[i]; pRes->orderedBins[i].lower = listBin[i];
...@@ -4989,6 +4999,7 @@ static void histogram_function(SQLFunctionCtx *pCtx) { ...@@ -4989,6 +4999,7 @@ static void histogram_function(SQLFunctionCtx *pCtx) {
for (int32_t b = 0; b < pRes->numOfBins; ++b) { for (int32_t b = 0; b < pRes->numOfBins; ++b) {
if (v > pRes->orderedBins[b].lower && v <= pRes->orderedBins[b].upper) { if (v > pRes->orderedBins[b].lower && v <= pRes->orderedBins[b].upper) {
pRes->orderedBins[b].count++; pRes->orderedBins[b].count++;
break;
} }
} }
} }
...@@ -5002,11 +5013,14 @@ static void histogram_function(SQLFunctionCtx *pCtx) { ...@@ -5002,11 +5013,14 @@ static void histogram_function(SQLFunctionCtx *pCtx) {
static void histogram_func_merge(SQLFunctionCtx *pCtx) { static void histogram_func_merge(SQLFunctionCtx *pCtx) {
SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx); SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx);
pInput->orderedBins = (SHistogramFuncBin*)((char*)pInput + sizeof(SHistogramFuncInfo));
SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx);
for (int32_t i = 0; i < pInput->numOfBins; ++i) { for (int32_t i = 0; i < pInput->numOfBins; ++i) {
pRes->orderedBins[i].count += pInput->orderedBins[i].count; pRes->orderedBins[i].count += pInput->orderedBins[i].count;
} }
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
...@@ -5018,6 +5032,16 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5018,6 +5032,16 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) {
return; return;
} }
for (int32_t i = 0; i < pRes->numOfBins; ++i) {
int sz = sprintf(pCtx->pOutput + VARSTR_HEADER_SIZE, "(%g-%g]:%"PRId64,
pRes->orderedBins[i].lower, pRes->orderedBins[i].upper, pRes->orderedBins[i].count);
varDataSetLen(pCtx->pOutput, sz);
pCtx->pOutput += pCtx->outputBytes;
}
pResInfo->numOfRes = pRes->numOfBins;
pResInfo->hasResult = DATA_SET_FLAG;
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -5041,8 +5065,8 @@ int32_t functionCompatList[] = { ...@@ -5041,8 +5065,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, csum, mavg, sample, // tid_tag, deriv, csum, mavg, sample,
6, 8, -1, -1, -1, 6, 8, -1, -1, -1,
// block_info, elapsed // block_info,elapsed,histogram
7, 1 7, 1, -1
}; };
SAggFunctionInfo aAggs[40] = {{ SAggFunctionInfo aAggs[40] = {{
...@@ -5505,6 +5529,7 @@ SAggFunctionInfo aAggs[40] = {{ ...@@ -5505,6 +5529,7 @@ SAggFunctionInfo aAggs[40] = {{
elapsedRequired, elapsedRequired,
}, },
{ {
//38
"histogram", "histogram",
TSDB_FUNC_HISTOGRAM, TSDB_FUNC_HISTOGRAM,
TSDB_FUNC_HISTOGRAM, TSDB_FUNC_HISTOGRAM,
......
...@@ -37,7 +37,8 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo ...@@ -37,7 +37,8 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE) { pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE ||
pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) {
return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册