提交 6f3be9fe 编写于 作者: S shenglian zhou

support concat with more than two columns

上级 67873662
......@@ -50,7 +50,8 @@ struct SSchema;
#define TSDB_FUNC_SCALAR_POW (TSDB_FUNC_FLAG_SCALAR | 0x0000)
#define TSDB_FUNC_SCALAR_LOG (TSDB_FUNC_FLAG_SCALAR | 0x0001)
#define TSDB_FUNC_SCALAR_CONCAT (TSDB_FUNC_FLAG_SCALAR | 0x0002)
#define TSDB_FUNC_SCALAR_MAX_NUM 3
#define TSDB_FUNC_SCALAR_LENGTH (TSDB_FUNC_FLAG_SCALAR | 0x0003)
#define TSDB_FUNC_SCALAR_MAX_NUM 4
#define TSDB_FUNC_SCALAR_NAME_MAX_LEN 16
typedef struct {
......
......@@ -60,37 +60,79 @@ int32_t exprTreeValidateFunctionNode(tExprNode *pExpr) {
break;
}
case TSDB_FUNC_SCALAR_CONCAT: {
if (pExpr->_func.numChildren != 2) {
if (pExpr->_func.numChildren < 2) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tExprNode *child1 = pExpr->_func.pChildren[0];
tExprNode *child2 = pExpr->_func.pChildren[1];
if (child1->nodeType == TSQL_NODE_VALUE) {
if (!IS_VAR_DATA_TYPE(child1->resultType)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
int16_t prevResultType = TSDB_DATA_TYPE_NULL;
int16_t resultType = TSDB_DATA_TYPE_NULL;
bool resultTypeAlreadySet = false;
for (int32_t i = 0; i < pExpr->_func.numChildren; ++i) {
tExprNode *child = pExpr->_func.pChildren[i];
if (child->nodeType != TSQL_NODE_VALUE) {
resultType = child->resultType;
if (!IS_VAR_DATA_TYPE(resultType)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (!resultTypeAlreadySet) {
resultTypeAlreadySet = true;
} else {
if (resultType != prevResultType) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
prevResultType = child->resultType;
}
tVariantTypeSetType(child1->pVal, (char)(child1->resultType));
child1->resultType = (int16_t)child2->pVal->nType;
child1->resultBytes = (int16_t)(child2->pVal->nLen + VARSTR_HEADER_SIZE);
}
if (child2->nodeType == TSQL_NODE_VALUE) {
if (!IS_VAR_DATA_TYPE(child2->pVal->nType)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
if (resultTypeAlreadySet) {
for (int32_t i = 0; i < pExpr->_func.numChildren; ++i) {
tExprNode *child = pExpr->_func.pChildren[i];
if (child->nodeType == TSQL_NODE_VALUE) {
if (!IS_VAR_DATA_TYPE(child->pVal->nType)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tVariantTypeSetType(child->pVal, (char)resultType);
child->resultType = resultType;
child->resultBytes = (int16_t)(child->pVal->nLen + VARSTR_HEADER_SIZE);
}
}
tVariantTypeSetType(child2->pVal, (char)(child1->resultType));
child2->resultType = (int16_t)child2->pVal->nType;
child2->resultBytes = (int16_t)(child2->pVal->nLen + VARSTR_HEADER_SIZE);
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (child1->resultType != child2->resultType || (!IS_VAR_DATA_TYPE(child1->resultType))) {
pExpr->resultType = resultType;
int16_t resultBytes = 0;
for (int32_t i = 0; i < pExpr->_func.numChildren; ++i) {
tExprNode *child = pExpr->_func.pChildren[i];
if (resultBytes <= resultBytes + child->resultBytes - VARSTR_HEADER_SIZE) {
resultBytes += child->resultBytes - VARSTR_HEADER_SIZE;
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
pExpr->resultBytes = resultBytes + VARSTR_HEADER_SIZE;
break;
}
case TSDB_FUNC_SCALAR_LENGTH: {
if (pExpr->_func.numChildren != 1) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
pExpr->resultType = child1->resultType;
pExpr->resultBytes = child1->resultBytes + child2->resultBytes - VARSTR_HEADER_SIZE;
tExprNode* child1 = pExpr->_func.pChildren[0];
if (child1->nodeType == TSQL_NODE_VALUE) {
if (!IS_VAR_DATA_TYPE(child1->pVal->nType)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tVariantTypeSetType(child1->pVal, (char)(child1->resultType));
child1->resultType = (int16_t)child1->pVal->nType;
child1->resultBytes = (int16_t)(child1->pVal->nLen + VARSTR_HEADER_SIZE);
}
if (!IS_VAR_DATA_TYPE(child1->resultType))
break;
}
default:
break;
}
......@@ -936,38 +978,66 @@ void vectorLog(int16_t functionId, tExprOperandInfo* pInputs, uint8_t numInputs,
}
void vectorConcat(int16_t functionId, tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(functionId == TSDB_FUNC_SCALAR_CONCAT && numInputs == 2 && order == TSDB_ORDER_ASC);
assert(pInputs[0].numOfRows >= 1 && pInputs[1].numOfRows >= 1);
assert(pOutput->numOfRows >= MAX(pInputs[0].numOfRows, pInputs[1].numOfRows));
char* data0 = NULL;
char* data1 = NULL;
char* outputData = NULL;
assert(functionId == TSDB_FUNC_SCALAR_CONCAT && numInputs >=2 && order == TSDB_ORDER_ASC);
for (int i = 0; i < numInputs; ++i) {
assert(pInputs[i].numOfRows == 1 || pInputs[i].numOfRows == pOutput->numOfRows);
}
char* outputData = NULL;
char** inputData = calloc(numInputs, sizeof(char*));
for (int i = 0; i < pOutput->numOfRows; ++i) {
if (pInputs[0].numOfRows == 1 && pInputs[1].numOfRows == 1) {
data0 = pInputs[0].data;
data1 = pInputs[1].data;
} else if (pInputs[0].numOfRows == 1){
for (int j = 0; j < numInputs; ++j) {
if (pInputs[j].numOfRows == 1) {
inputData[j] = pInputs[j].data;
} else {
inputData[j] = pInputs[j].data + i * pInputs[j].bytes;
}
}
outputData = pOutput->data + i * pOutput->bytes;
bool hasNullInputs = false;
for (int j = 0; j < numInputs; ++j) {
if (isNull(inputData[j], pInputs[j].type)) {
hasNullInputs = true;
setNull(outputData, pOutput->type, pOutput->bytes);
}
}
if (!hasNullInputs) {
int16_t dataLen = 0;
for (int j = 0; j < numInputs; ++j) {
memcpy(varDataVal(outputData)+dataLen, varDataVal(inputData[j]), varDataLen(inputData[j]));
dataLen += varDataLen(inputData[j]);
}
varDataSetLen(outputData, dataLen);
}
}
free(inputData);
}
void vectorLength(int16_t functionId, tExprOperandInfo *pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(functionId == TSDB_FUNC_SCALAR_LENGTH && numInputs == 1 && order == TSDB_ORDER_ASC);
char* data0 = NULL;
char* outputData = NULL;
for (int32_t i = 0; i < pOutput->numOfRows; ++i) {
if (pInputs[0].numOfRows == 1) {
data0 = pInputs[0].data;
data1 = pInputs[1].data + i * pInputs[1].bytes;
} else if (pInputs[1].numOfRows == 1){
data0 = pInputs[0].data + i * pInputs[0].bytes;
data1 = pInputs[1].data;
} else {
data0 = pInputs[0].data + i * pInputs[0].bytes;
data1 = pInputs[1].data + i * pInputs[1].bytes;
}
outputData = pOutput->data + i * pOutput->bytes;
if (isNull(data0, pInputs[0].type) || isNull(data1, pInputs[1].type)) {
if (isNull(data0, pInputs[0].type)) {
setNull(outputData, pOutput->type, pOutput->bytes);
} else {
varDataSetLen(outputData, varDataLen(data0)+ varDataLen(data1));
memcpy(varDataVal(outputData), varDataVal(data0), varDataLen(data0));
memcpy(varDataVal(outputData)+ varDataLen(data0), varDataVal(data1), varDataLen(data1));
int16_t result = varDataLen(data0);
SET_TYPED_DATA(outputData, pOutput->type, result);
}
}
}
_expr_scalar_function_t getExprScalarFunction(uint16_t funcId) {
assert(TSDB_FUNC_IS_SCALAR(funcId));
int16_t scalaIdx = TSDB_FUNC_SCALAR_INDEX(funcId);
......@@ -991,4 +1061,9 @@ tScalarFunctionInfo aScalarFunctions[] = {
"concat",
vectorConcat
},
{
TSDB_FUNC_SCALAR_LENGTH,
"length",
vectorLength
}
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册