提交 e37bdbd5 编写于 作者: G Ganlin Zhao

feat(query): add histogram function scalar version

TD-17344
上级 bcb7cb40
......@@ -116,6 +116,7 @@ int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}
......
......@@ -2331,6 +2331,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getHistogramFuncEnv,
.initFunc = histogramFunctionSetup,
.processFunc = histogramFunction,
.sprocessFunc = histogramScalarFunction,
.finalizeFunc = histogramFinalize,
.invertFunc = NULL,
.combineFunc = histogramCombine,
......
......@@ -2601,3 +2601,218 @@ int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SSca
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
typedef enum { UNKNOWN_BIN = 0, USER_INPUT_BIN, LINEAR_BIN, LOG_BIN } EHistoBinType;
static int8_t getHistogramBinType(char* binTypeStr) {
int8_t binType;
if (strcasecmp(binTypeStr, "user_input") == 0) {
binType = USER_INPUT_BIN;
} else if (strcasecmp(binTypeStr, "linear_bin") == 0) {
binType = LINEAR_BIN;
} else if (strcasecmp(binTypeStr, "log_bin") == 0) {
binType = LOG_BIN;
} else {
binType = UNKNOWN_BIN;
}
return binType;
}
typedef struct SHistoFuncBin {
double lower;
double upper;
int64_t count;
double percentage;
} SHistoFuncBin;
static bool getHistogramBinDesc(SHistoFuncBin** bins, int32_t* binNum, char* binDescStr, int8_t binType, bool normalized) {
cJSON* binDesc = cJSON_Parse(binDescStr);
int32_t numOfBins;
double* intervals;
if (cJSON_IsObject(binDesc)) { /* linaer/log bins */
int32_t numOfParams = cJSON_GetArraySize(binDesc);
int32_t startIndex;
if (numOfParams != 4) {
return false;
}
cJSON* start = cJSON_GetObjectItem(binDesc, "start");
cJSON* factor = cJSON_GetObjectItem(binDesc, "factor");
cJSON* width = cJSON_GetObjectItem(binDesc, "width");
cJSON* count = cJSON_GetObjectItem(binDesc, "count");
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
return false;
}
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
return false;
}
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
return false;
}
int32_t counter = (int32_t)count->valueint;
if (infinity->valueint == false) {
startIndex = 0;
numOfBins = counter + 1;
} else {
startIndex = 1;
numOfBins = counter + 3;
}
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
// linear bin process
if (width->valuedouble == 0) {
taosMemoryFree(intervals);
return false;
}
for (int i = 0; i < counter + 1; ++i) {
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
if (isinf(intervals[startIndex])) {
taosMemoryFree(intervals);
return false;
}
startIndex++;
}
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
// log bin process
if (start->valuedouble == 0) {
taosMemoryFree(intervals);
return false;
}
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
taosMemoryFree(intervals);
return false;
}
for (int i = 0; i < counter + 1; ++i) {
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
if (isinf(intervals[startIndex])) {
taosMemoryFree(intervals);
return false;
}
startIndex++;
}
} else {
taosMemoryFree(intervals);
return false;
}
if (infinity->valueint == true) {
intervals[0] = -INFINITY;
intervals[numOfBins - 1] = INFINITY;
// in case of desc bin orders, -inf/inf should be swapped
ASSERT(numOfBins >= 4);
if (intervals[1] > intervals[numOfBins - 2]) {
TSWAP(intervals[0], intervals[numOfBins - 1]);
}
}
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
if (binType != USER_INPUT_BIN) {
return false;
}
numOfBins = cJSON_GetArraySize(binDesc);
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
cJSON* bin = binDesc->child;
if (bin == NULL) {
taosMemoryFree(intervals);
return false;
}
int i = 0;
while (bin) {
intervals[i] = bin->valuedouble;
if (!cJSON_IsNumber(bin)) {
taosMemoryFree(intervals);
return false;
}
if (i != 0 && intervals[i] <= intervals[i - 1]) {
taosMemoryFree(intervals);
return false;
}
bin = bin->next;
i++;
}
} else {
return false;
}
*binNum = numOfBins - 1;
*bins = taosMemoryCalloc(numOfBins, sizeof(SHistoFuncBin));
for (int32_t i = 0; i < *binNum; ++i) {
(*bins)[i].lower = intervals[i] < intervals[i + 1] ? intervals[i] : intervals[i + 1];
(*bins)[i].upper = intervals[i + 1] > intervals[i] ? intervals[i + 1] : intervals[i];
(*bins)[i].count = 0;
}
taosMemoryFree(intervals);
return true;
}
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
SHistoFuncBin *bins;
int32_t numOfBins = 0;
int32_t totalCount = 0;
int8_t binType = getHistogramBinType(varDataVal(pInput[1].columnData->pData));
char* binDesc = varDataVal(pInput[2].columnData->pData);
int64_t normalized = *(int64_t *)(pInput[3].columnData->pData);
int32_t type = GET_PARAM_TYPE(pInput);
if (!getHistogramBinDesc(&bins, &numOfBins, binDesc, binType, (bool)normalized)) {
return TSDB_CODE_FAILED;
}
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
continue;
}
char* data = colDataGetData(pInputData, i);
double v;
GET_TYPED_DATA(v, double, type, data);
for (int32_t k = 0; k < numOfBins; ++k) {
if (v > bins[k].lower && v <= bins[k].upper) {
bins[k].count++;
totalCount++;
break;
}
}
}
if (normalized) {
for (int32_t k = 0; k < numOfBins; ++k) {
if (totalCount != 0) {
bins[k].percentage = bins[k].count / (double)totalCount;
} else {
bins[k].percentage = 0;
}
}
}
for (int32_t k = 0; k < numOfBins; ++k) {
int32_t len;
char buf[512] = {0};
if (!normalized) {
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
bins[k].lower, bins[k].upper, bins[k].count);
} else {
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
bins[k].lower, bins[k].upper, bins[k].percentage);
}
varDataSetLen(buf, len);
colDataAppend(pOutputData, k, buf, false);
}
taosMemoryFree(bins);
pOutput->numOfRows = numOfBins;
return TSDB_CODE_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册