提交 87fefa9e 编写于 作者: G Ganlin Zhao

[TD-14241]: add string functions

上级 631f52be
......@@ -72,10 +72,15 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ATAN,
// string function
FUNCTION_TYPE_CHAR_LENGTH = 1500,
FUNCTION_TYPE_LENGTH = 1500,
FUNCTION_TYPE_CHAR_LENGTH,
FUNCTION_TYPE_CONCAT,
FUNCTION_TYPE_CONCAT_WS,
FUNCTION_TYPE_LENGTH,
FUNCTION_TYPE_LOWER,
FUNCTION_TYPE_UPPER,
FUNCTION_TYPE_LTRIM,
FUNCTION_TYPE_RTRIM,
FUNCTION_TYPE_SUBSTR,
// conversion function
FUNCTION_TYPE_CAST = 2000,
......
......@@ -42,6 +42,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut);
/* Math functions */
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
......@@ -58,6 +59,18 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
/* String functions */
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}
#endif
......
......@@ -282,6 +282,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = atanFunction,
.finalizeFunc = NULL
},
{
.name = "length",
.type = FUNCTION_TYPE_LENGTH,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = lengthFunction,
.finalizeFunc = NULL
},
{
.name = "char_length",
.type = FUNCTION_TYPE_CHAR_LENGTH,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = charLengthFunction,
.finalizeFunc = NULL
},
{
.name = "concat",
.type = FUNCTION_TYPE_CONCAT,
......@@ -289,7 +309,67 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = concatFunction,
.finalizeFunc = NULL
},
{
.name = "concat_ws",
.type = FUNCTION_TYPE_CONCAT_WS,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = concatWsFunction,
.finalizeFunc = NULL
},
{
.name = "lower",
.type = FUNCTION_TYPE_LOWER,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = lowerFunction,
.finalizeFunc = NULL
},
{
.name = "upper",
.type = FUNCTION_TYPE_UPPER,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = upperFunction,
.finalizeFunc = NULL
},
{
.name = "ltrim",
.type = FUNCTION_TYPE_LTRIM,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = ltrimFunction,
.finalizeFunc = NULL
},
{
.name = "rtrim",
.type = FUNCTION_TYPE_RTRIM,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = rtrimFunction,
.finalizeFunc = NULL
},
{
.name = "substr",
.type = FUNCTION_TYPE_SUBSTR,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = substrFunction,
.finalizeFunc = NULL
},
{
......
......@@ -47,7 +47,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
#define GET_PARAM_BYTES(_c) ((_c)->pColumnInfoData->info.bytes)
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
void sclFreeParam(SScalarParam *param);
......
......@@ -3,10 +3,14 @@
#include "sclInt.h"
#include "sclvector.h"
static void assignBasicParaInfo(struct SScalarParam* dst, const struct SScalarParam* src) {
// dst->type = src->type;
// dst->bytes = src->bytes;
// dst->num = src->num;
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
typedef double (*_double_fn_2)(double, double);
typedef int (*_conv_fn)(int);
typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
double tlog(double v, double base) {
return log(v) / log(base);
}
/** Math functions **/
......@@ -107,14 +111,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
return TSDB_CODE_SUCCESS;
}
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
typedef double (*_double_fn_2)(double, double);
double tlog(double v, double base) {
return log(v) / log(base);
}
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
......@@ -216,6 +212,341 @@ int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* p
return TSDB_CODE_SUCCESS;
}
/** String functions **/
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
char **in = (char **)pInputData->pData;
int16_t *out = (int16_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i);
continue;
}
out[i] = varDataLen(in[i]);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
char **in = (char **)pInputData->pData;
int16_t *out = (int16_t *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i);
continue;
}
if (type == TSDB_DATA_TYPE_VARCHAR) {
out[i] = varDataLen(in[i]);
} else { //NCHAR
out[i] = varDataLen(in[i]) / TSDB_NCHAR_SIZE;
}
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
return TSDB_CODE_FAILED;
}
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
SColumnInfoData *pOutputData = pOutput->columnData;
for (int32_t i = 0; i < inputNum; ++i) {
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
return TSDB_CODE_FAILED;
}
pInputData[i] = pInput[i].columnData;
}
bool hasNull = false;
for (int32_t k = 0; k < pInput->numOfRows; ++k) {
for (int32_t i = 0; i < inputNum; ++i) {
if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) {
colDataSetNull_f(pOutputData->nullbitmap, k);
hasNull = true;
break;
}
}
if (hasNull) {
continue;
}
char *in = NULL;
char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput);
int16_t dataLen = 0;
for (int32_t i = 0; i < inputNum; ++i) {
in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]);
memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in));
dataLen += varDataLen(in);
}
varDataSetLen(out, dataLen);
}
pOutput->numOfRows = pInput->numOfRows;
taosMemoryFree(pInputData);
return TSDB_CODE_SUCCESS;
}
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
return TSDB_CODE_FAILED;
}
SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
SColumnInfoData *pOutputData = pOutput->columnData;
for (int32_t i = 0; i < inputNum; ++i) {
if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
return TSDB_CODE_FAILED;
}
pInputData[i] = pInput[i].columnData;
}
for (int32_t k = 0; k < pInput->numOfRows; ++k) {
char *sep = pInputData[0]->pData;
if (colDataIsNull_f(pInputData[0]->nullbitmap, k)) {
colDataSetNull_f(pOutputData->nullbitmap, k);
continue;
}
char *in = NULL;
char *out = pOutputData->pData + k * GET_PARAM_BYTES(pOutput);
int16_t dataLen = 0;
for (int32_t i = 1; i < inputNum; ++i) {
if (colDataIsNull_f(pInputData[i]->nullbitmap, k)) {
continue;
}
in = pInputData[i]->pData + k * GET_PARAM_BYTES(&pInput[i]);
memcpy(varDataVal(out) + dataLen, varDataVal(in), varDataLen(in));
dataLen += varDataLen(in);
if (i < inputNum - 1) {
//insert the separator
memcpy(varDataVal(out) + dataLen, varDataVal(sep), varDataLen(sep));
dataLen += varDataLen(sep);
}
}
varDataSetLen(out, dataLen);
}
pOutput->numOfRows = pInput->numOfRows;
taosMemoryFree(pInputData);
return TSDB_CODE_SUCCESS;
}
int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i);
continue;
}
char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput);
char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput);
int32_t len = varDataLen(in);
if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t j = 0; j < len; ++j) {
*(varDataVal(out) + j) = convFn(*(varDataVal(in) + j));
}
} else { //NCHAR
for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) {
*((uint32_t *)varDataVal(out) + j) = convFn(*((uint32_t *)varDataVal(in) + j));
}
}
varDataSetLen(out, len);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
int32_t numOfSpaces = 0;
if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t i = 0; i < charLen; ++i) {
if (!isspace(*(varDataVal(input) + i))) {
break;
}
numOfSpaces++;
}
} else { //NCHAR
for (int32_t i = 0; i < charLen; ++i) {
if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
break;
}
numOfSpaces++;
}
}
int32_t resLen;
if (type == TSDB_DATA_TYPE_VARCHAR) {
resLen = charLen - numOfSpaces;
memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen);
} else {
resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen);
}
varDataSetLen(output, resLen);
}
void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
int32_t numOfSpaces = 0;
if (type == TSDB_DATA_TYPE_VARCHAR) {
for (int32_t i = charLen - 1; i >= 0; --i) {
if (!isspace(*(varDataVal(input) + i))) {
break;
}
numOfSpaces++;
}
} else { //NCHAR
for (int32_t i = charLen - 1; i < charLen; ++i) {
if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
break;
}
numOfSpaces++;
}
}
int32_t resLen;
if (type == TSDB_DATA_TYPE_VARCHAR) {
resLen = charLen - numOfSpaces;
} else {
resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
}
memcpy(varDataVal(output), varDataVal(input), resLen);
varDataSetLen(output, resLen);
}
int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i);
continue;
}
char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput);
char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput);
int32_t len = varDataLen(in);
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
trimFn(in, out, type, charLen);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 2 || inputNum!= 3) {
return TSDB_CODE_FAILED;
}
int32_t subPos = 0;
GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
if (subPos == 0) { //subPos needs to be positive or negative values;
return TSDB_CODE_FAILED;
}
int32_t subLen = INT16_MAX;
if (inputNum == 3) {
GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
if (subLen < 0) { //subLen cannot be negative
return TSDB_CODE_FAILED;
}
subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
for (int32_t i = 0; i < pOutput->numOfRows; ++i) {
if (colDataIsNull_f(pInputData->nullbitmap, i)) {
colDataSetNull_f(pOutputData->nullbitmap, i);
continue;
}
char *in = pInputData->pData + i * GET_PARAM_BYTES(pInput);
char *out = pOutputData->pData + i * GET_PARAM_BYTES(pInput);
int32_t len = varDataLen(in);
int32_t startPosBytes;
if (subPos > 0) {
startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE;
startPosBytes = MIN(startPosBytes, len);
} else {
startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE;
startPosBytes = MAX(startPosBytes, 0);
}
subLen = MIN(subLen, len - startPosBytes);
if (subLen > 0) {
memcpy(varDataVal(out), varDataVal(in) + startPosBytes, subLen);
}
varDataSetLen(out, subLen);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
}
......@@ -264,57 +595,20 @@ int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
}
static void tlength(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assert(numOfInput == 1);
#if 0
int64_t* out = (int64_t*) pOutput->data;
char* s = pLeft->data;
for(int32_t i = 0; i < pLeft->num; ++i) {
out[i] = varDataLen(POINTER_SHIFT(s, i * pLeft->bytes));
}
#endif
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doCaseConvFunction(pInput, inputNum, pOutput, tolower);
}
static void tconcat(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
assert(numOfInput > 0);
#if 0
int32_t rowLen = 0;
int32_t num = 1;
for(int32_t i = 0; i < numOfInput; ++i) {
rowLen += pLeft[i].bytes;
if (pLeft[i].num > 1) {
num = pLeft[i].num;
}
}
pOutput->data = taosMemoryRealloc(pOutput->data, rowLen * num);
assert(pOutput->data);
char* rstart = pOutput->data;
for(int32_t i = 0; i < num; ++i) {
char* s = rstart;
varDataSetLen(s, 0);
for (int32_t j = 0; j < numOfInput; ++j) {
char* p1 = POINTER_SHIFT(pLeft[j].data, i * pLeft[j].bytes);
memcpy(varDataVal(s) + varDataLen(s), varDataVal(p1), varDataLen(p1));
varDataLen(s) += varDataLen(p1);
}
rstart += rowLen;
}
#endif
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doCaseConvFunction(pInput, inputNum, pOutput, toupper);
}
static void tltrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doTrimFunction(pInput, inputNum, pOutput, tltrim);
}
static void trtrim(SScalarParam* pOutput, size_t numOfInput, const SScalarParam *pLeft) {
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doTrimFunction(pInput, inputNum, pOutput, trtrim);
}
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册