提交 c9ba7e02 编写于 作者: D dapan1121

feature/qnode

上级 46b7eb0d
......@@ -24,6 +24,11 @@ extern "C" {
typedef struct SFilterInfo SFilterInfo;
typedef struct SFilterColumnParam{
int32_t numOfCols;
SArray* pDataBlock;
} SFilterColumnParam;
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes);
int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst);
......
......@@ -309,6 +309,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.type)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnRefNode *)((fi)->desc))->columnId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnRefNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnRefNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnRefNode *)((fi)->desc))->dataType.bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
......@@ -370,4 +371,4 @@ extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr);
}
#endif
#endif // TDENGINE_FILTER_INT_H
\ No newline at end of file
#endif // TDENGINE_FILTER_INT_H
......@@ -18,6 +18,7 @@
//#include "queryLog.h"
#include "tcompare.h"
#include "filterInt.h"
#include "filter.h"
OptrStr gOptrStr[] = {
{TSDB_RELATION_INVALID, "invalid"},
......@@ -1418,8 +1419,8 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
qDebug("COLUMN Field Num:%u", info->fields[FLD_TYPE_COLUMN].num);
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema *sch = field->desc;
qDebug("COL%d => [%d][%s]", i, sch->colId, sch->name);
SColumnRefNode *refNode = (SColumnRefNode *)field->desc;
qDebug("COL%d => [%d][%d]", i, refNode->tupleId, refNode->slotId);
}
qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num);
......@@ -1446,9 +1447,9 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
char str[512] = {0};
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SSchema *sch = left->desc;
SColumnRefNode *refNode = (SColumnRefNode *)left->desc;
if (unit->compare.optr >= TSDB_RELATION_INVALID && unit->compare.optr <= TSDB_RELATION_CONTAINS){
len = sprintf(str, "UNIT[%d] => [%d][%s] %s [", i, sch->colId, sch->name, gOptrStr[unit->compare.optr].str);
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr].str);
}
if (unit->right.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != TSDB_RELATION_IN) {
......@@ -1467,7 +1468,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
if (unit->compare.optr2) {
strcat(str, " && ");
if (unit->compare.optr2 >= TSDB_RELATION_INVALID && unit->compare.optr2 <= TSDB_RELATION_CONTAINS){
sprintf(str + strlen(str), "[%d][%s] %s [", sch->colId, sch->name, gOptrStr[unit->compare.optr2].str);
sprintf(str + strlen(str), "[%d][%d] %s [", refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr2].str);
}
if (unit->right2.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != TSDB_RELATION_IN) {
......@@ -1769,7 +1770,7 @@ _return:
FLT_RET(code);
}
int32_t filterInitValFieldData(SFilterInfo *info) {
int32_t fltInitValFieldData(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
if (unit->right.type != FLD_TYPE_VALUE) {
......@@ -1793,7 +1794,6 @@ int32_t filterInitValFieldData(SFilterInfo *info) {
if (unit->compare.optr == TSDB_RELATION_IN) {
FLT_ERR_RET(fltGenerateSetFromList((void **)&fi->data, fi->desc, type));
filterConvertSetFromBinary((void **)&fi->data, var->pz, var->nLen, type, false);
if (fi->data == NULL) {
fltError("failed to convert in param");
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -2973,47 +2973,6 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
return all;
}
static void doJsonCompare(SFilterComUnit *cunit, int8_t *result, void* colData){
if(cunit->optr == TSDB_RELATION_MATCH || cunit->optr == TSDB_RELATION_NMATCH){
uint8_t jsonType = *(char*)colData;
char* realData = POINTER_SHIFT(colData, CHAR_BYTES);
if (jsonType != TSDB_DATA_TYPE_NCHAR){
*result = false;
}else{
char *newColData = calloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
int len = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(newColData));
if (len < 0){
qError("castConvert1 taosUcs4ToMbs error");
tfree(newColData);
return;
}
varDataSetLen(newColData, len);
tVariant* val = cunit->valData;
char newValData[TSDB_REGEX_STRING_DEFAULT_LEN * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE] = {0};
assert(val->nLen <= TSDB_REGEX_STRING_DEFAULT_LEN * TSDB_NCHAR_SIZE);
memcpy(varDataVal(newValData), val->pz, val->nLen);
varDataSetLen(newValData, val->nLen);
*result = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, newValData);
tfree(newColData);
}
}else if(cunit->optr == TSDB_RELATION_LIKE){
uint8_t jsonType = *(char*)colData;
char* realData = POINTER_SHIFT(colData, CHAR_BYTES);
if (jsonType != TSDB_DATA_TYPE_NCHAR){
*result = false;
}else{
tVariant* val = cunit->valData;
char* newValData = calloc(val->nLen + VARSTR_HEADER_SIZE, 1);
memcpy(varDataVal(newValData), val->pz, val->nLen);
varDataSetLen(newValData, val->nLen);
*result = filterDoCompare(gDataCompare[cunit->func], cunit->optr, realData, newValData);
tfree(newValData);
}
}else{
*result = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
}
}
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SDataStatis *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
......@@ -3083,8 +3042,6 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
}
tfree(newColData);
}else if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
doJsonCompare(&(info->cunits[uidx]), &(*p)[i], colData);
}else{
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
}
......@@ -3145,8 +3102,6 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
}
tfree(newColData);
}else if(cunit->dataType == TSDB_DATA_TYPE_JSON){
doJsonCompare(cunit, &(*p)[i], colData);
}else{
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
}
......@@ -3174,11 +3129,6 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
return all;
}
FORCE_INLINE bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
return (*info->func)(info, numOfRows, p, statis, numOfCols);
}
int32_t filterSetExecFunc(SFilterInfo *info) {
if (FILTER_ALL_RES(info)) {
info->func = filterExecuteImplAll;
......@@ -3258,7 +3208,8 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
int32_t fltSetColFieldDataImpl(SFilterInfo *info, void *param, filer_get_col_from_id fp, bool fromColId) {
CHK_LRET(info == NULL, TSDB_CODE_QRY_APP_ERROR, "info NULL");
CHK_LRET(info->fields[FLD_TYPE_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds");
......@@ -3269,7 +3220,11 @@ int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i];
(*fp)(param, FILTER_GET_COL_FIELD_ID(fi), &fi->data);
if (fromColId) {
(*fp)(param, FILTER_GET_COL_FIELD_ID(fi), &fi->data);
} else {
(*fp)(param, FILTER_GET_COL_FIELD_SLOT_ID(fi), &fi->data);
}
}
filterUpdateComUnits(info);
......@@ -3292,7 +3247,7 @@ int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) {
filterConvertGroupFromArray(info, group);
taosArrayDestroy(group);
FLT_ERR_JRET(filterInitValFieldData(info));
FLT_ERR_JRET(fltInitValFieldData(info));
if (!FILTER_GET_FLAG(info->options, FI_OPTION_NO_REWRITE)) {
filterDumpInfoToString(info, "Before preprocess", 0);
......@@ -3563,8 +3518,6 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) {
return TSDB_CODE_SUCCESS;
}
##################################################################################
EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
SFltTreeStat *stat = (SFltTreeStat *)pContext;
......@@ -3641,11 +3594,21 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
node->pRight = t;
}
if (OP_TYPE_IN == node->opType || QUERY_NODE_NODE_LIST != nodeType(node->pRight)) {
fltError("failed to convert in param");
if (OP_TYPE_IN == node->opType && QUERY_NODE_NODE_LIST != nodeType(node->pRight)) {
fltError("invalid IN operator node, rightType:%d", nodeType(node->pRight));
stat->code = TSDB_CODE_QRY_APP_ERROR;
return DEAL_RES_ERROR;
}
}
if (OP_TYPE_IN != node->opType) {
SColumnRefNode *refNode = (SColumnRefNode *)node->pLeft;
SValueNode *valueNode = (SValueNode *)node->pRight;
int32_t type = vectorGetConvertType(refNode->dataType.type, valueNode->node.resType.type);
if (0 != type && type != refNode->dataType.type) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
}
}
return DEAL_RES_CONTINUE;
......@@ -3669,6 +3632,32 @@ int32_t fltOptimizeNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat)
}
int32_t filterGetDataFromColId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SFilterColumnParam *)param)->numOfCols;
SArray* pDataBlock = ((SFilterColumnParam *)param)->pDataBlock;
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j);
if (id == pColInfo->info.colId) {
*data = pColInfo->pData;
break;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, false);
}
int32_t filterSetDataFromColId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, true);
}
int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) {
int32_t code = 0;
SFilterInfo *info = NULL;
......
......@@ -515,6 +515,18 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
};
int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
if (type1 == type2) {
return 0;
}
if (type1 < type2) {
return gConvertTypes[type1][type2];
}
return gConvertTypes[type2][type1];
}
int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) {
if (pLeft->type == pRight->type) {
return TSDB_CODE_SUCCESS;
......@@ -536,7 +548,8 @@ int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* p
paramOut2 = pLeftOut;
}
int8_t type = gConvertTypes[param1->type][param2->type];
int8_t type = vectorGetConvertType(param1->type, param2->type);
if (0 == type) {
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册