提交 50b396c8 编写于 作者: D dapan1121

fix: fix case when issues

上级 8ab016fd
......@@ -1209,6 +1209,17 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
SDataType* pType = &pNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
} else if (type == QUERY_NODE_CASE_WHEN) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SCaseWhenNode* pNode = (SCaseWhenNode*)pTargetNode->pExpr;
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
SDataType* pType = &pNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pNode->node.aliasName);
......
......@@ -69,6 +69,9 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision)
void sclFreeParam(SScalarParam *param);
void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows,
int32_t _ord, int32_t optr);
void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr);
#ifdef __cplusplus
}
......
......@@ -76,6 +76,28 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out, int32_t* ov
return code;
}
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
if (pDst->numOfRows < pb->info.rows) {
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
}
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
if (NULL == pObj) {
......@@ -549,11 +571,12 @@ int32_t sclGetNodeRes(SNode* node, SScalarCtx *ctx, SScalarParam **res) {
return TSDB_CODE_SUCCESS;
}
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pCase, SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx) {
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pCase, SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete) {
SNode *node = NULL;
SWhenThenNode* pWhenThen = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
int32_t code = 0;
for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
pWhenThen = (SWhenThenNode*)node;
......@@ -563,26 +586,46 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell*
doVectorCompare(pCase, pWhen, pComp, rowIdx, 1, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
bool *equal = colDataGetData(pComp->columnData, rowIdx);
bool *equal = (bool*)colDataGetData(pComp->columnData, rowIdx);
if (*equal) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen, (pThen->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
return TSDB_CODE_SUCCESS;
goto _return;
}
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse, (pElse->numOfRows > 1 ? rowIdx : 0)));
return TSDB_CODE_SUCCESS;
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
}
colDataAppend(output->columnData, rowIdx, NULL, true);
return TSDB_CODE_SUCCESS;
if (0 == rowIdx && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
_return:
sclFreeParam(pWhen);
sclFreeParam(pThen);
SCL_RET(code);
}
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pElse, SScalarParam *output, int32_t rowIdx) {
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pElse, SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete) {
SNode *node = NULL;
SWhenThenNode* pWhenThen = NULL;
SScalarParam *pWhen = NULL;
......@@ -597,13 +640,14 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
bool *whenValue = colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
if (*whenValue) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen, (pThen->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pWhen->numOfRows) {
if (0 == rowIdx && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
......@@ -614,10 +658,11 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse, (pElse->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pElse->numOfRows) {
if (0 == rowIdx && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
goto _return;
......@@ -625,8 +670,9 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
colDataAppend(output->columnData, rowIdx, NULL, true);
if (0 == rowIdx) {
if (0 == rowIdx && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
*complete = true;
}
_return:
......@@ -637,8 +683,6 @@ _return:
SCL_RET(code);
}
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
SScalarParam *params = NULL;
int32_t rowNum = 0;
......@@ -801,8 +845,9 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
SScalarParam *pElse = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
SScalarParam *pComp = NULL;
SScalarParam comp = {0};
int32_t rowNum = 1;
bool complete = false;
if (NULL == node->pWhenThenList || node->pWhenThenList->length <= 0) {
sclError("invalid whenThen list");
......@@ -812,6 +857,7 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
if (ctx->pBlockList) {
SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
rowNum = pb->info.rows;
output->numOfRows = pb->info.rows;
}
SCL_ERR_JRET(sclCreateColumnInfoData(&node->node.resType, rowNum, output));
......@@ -823,7 +869,7 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
compType.type = TSDB_DATA_TYPE_BOOL;
compType.bytes = tDataTypes[compType.type].bytes;
SCL_ERR_JRET(sclCreateColumnInfoData(&compType, rowNum, pComp));
SCL_ERR_JRET(sclCreateColumnInfoData(&compType, rowNum, &comp));
SNode* tnode = NULL;
SWhenThenNode* pWhenThen = (SWhenThenNode*)node->pWhenThenList->pHead->pNode;
......@@ -832,31 +878,44 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
if (pCase) {
vectorCompare(pCase, pWhen, pComp, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
for (int32_t i = 0; i < rowNum; ++i) {
bool *equal = colDataGetData(pComp->columnData, i);
bool *equal = (bool*)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0));
if (*equal) {
colDataAppend(output->columnData, i, colDataGetData(pThen, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen, (pThen->numOfRows > 1 ? i : 0)));
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse, pComp, output, i));
SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse, &comp, output, i, rowNum, &complete));
if (complete) {
break;
}
}
}
} else {
for (int32_t i = 0; i < rowNum; ++i) {
bool *equal = colDataGetData(pWhen->columnData, (pThen->numOfRows > 1 ? i : 0));
if (*equal) {
colDataAppend(output->columnData, i, colDataGetData(pThen, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen, (pThen->numOfRows > 1 ? i : 0)));
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0));
if (*whenValue) {
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i));
SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i, rowNum, &complete));
if (complete) {
break;
}
}
}
}
sclFreeParam(pCase);
sclFreeParam(pElse);
sclFreeParam(pComp);
sclFreeParam(&comp);
sclFreeParam(pWhen);
sclFreeParam(pThen);
......@@ -866,7 +925,7 @@ _return:
sclFreeParam(pCase);
sclFreeParam(pElse);
sclFreeParam(pComp);
sclFreeParam(&comp);
sclFreeParam(pWhen);
sclFreeParam(pThen);
sclFreeParam(output);
......@@ -1332,7 +1391,9 @@ EDealRes sclWalkCaseWhen(SNode* pNode, SScalarCtx *ctx) {
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)|| QUERY_NODE_LEFT_VALUE == nodeType(pNode)) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode)
|| QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode)
|| QUERY_NODE_WHEN_THEN == nodeType(pNode)) {
return DEAL_RES_CONTINUE;
}
......@@ -1362,25 +1423,6 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
return DEAL_RES_ERROR;
}
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLeft->numOfRows = pb->info.rows;
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
}
int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) {
if (NULL == pNode) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
......@@ -1715,10 +1715,9 @@ void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
fp = filterGetCompFuncEx(lType, rType, optr);
}
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
if (startIndex < 0) {
i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
compRows = pOut->numOfRows;
} else {
compRows = startIndex + numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册