提交 46b7eb0d 编写于 作者: D dapan

feature/qnode

上级 8d413f09
...@@ -223,6 +223,7 @@ typedef struct SFilterPCtx { ...@@ -223,6 +223,7 @@ typedef struct SFilterPCtx {
typedef struct SFltTreeStat { typedef struct SFltTreeStat {
int32_t code; int32_t code;
bool scalarMode;
} SFltTreeStat; } SFltTreeStat;
typedef struct SFltScalarCtx { typedef struct SFltScalarCtx {
...@@ -287,6 +288,7 @@ typedef struct SFilterInfo { ...@@ -287,6 +288,7 @@ typedef struct SFilterInfo {
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0) #define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0) #define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_NULL)
#define fltFatal(...) qFatal(__VA_ARGS__) #define fltFatal(...) qFatal(__VA_ARGS__)
#define fltError(...) qError(__VA_ARGS__) #define fltError(...) qError(__VA_ARGS__)
...@@ -304,12 +306,12 @@ typedef struct SFilterInfo { ...@@ -304,12 +306,12 @@ typedef struct SFilterInfo {
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx])) #define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx])) #define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type) #define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.type)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SSchema *)((fi)->desc))->bytes) #define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SSchema *)((fi)->desc))->colId) #define FILTER_GET_COL_FIELD_ID(fi) (((SColumnRefNode *)((fi)->desc))->columnId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc)) #define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnRefNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri)) #define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnRefNode *)((fi)->desc))->dataType.bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType) #define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data) #define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc) #define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX) #define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
......
...@@ -39,27 +39,6 @@ OptrStr gOptrStr[] = { ...@@ -39,27 +39,6 @@ OptrStr gOptrStr[] = {
{TSDB_RELATION_CONTAINS, "contains"}, {TSDB_RELATION_CONTAINS, "contains"},
}; };
static FORCE_INLINE int32_t filterFieldColDescCompare(const void *desc1, const void *desc2) {
const SSchema *sch1 = desc1;
const SSchema *sch2 = desc2;
return !(strcmp(sch1->name, sch2->name) == 0 && sch1->colId == sch2->colId);
}
static FORCE_INLINE int32_t filterFieldValDescCompare(const void *desc1, const void *desc2) {
const SVariant *val1 = desc1;
const SVariant *val2 = desc2;
return taosVariantCompare(val1, val2);
}
filter_desc_compare_func gDescCompare [FLD_TYPE_MAX] = {
NULL,
filterFieldColDescCompare,
filterFieldValDescCompare
};
bool filterRangeCompGi (const void *minv, const void *maxv, const void *minr, const void *maxr, __compar_fn_t cfunc) { bool filterRangeCompGi (const void *minv, const void *maxv, const void *minr, const void *maxr, __compar_fn_t cfunc) {
int32_t result = cfunc(maxv, minr); int32_t result = cfunc(maxv, minr);
if (result == TSDB_DATA_JSON_CAN_NOT_COMPARE) return false; if (result == TSDB_DATA_JSON_CAN_NOT_COMPARE) return false;
...@@ -810,7 +789,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) { ...@@ -810,7 +789,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) {
int32_t filterGetFiledByDesc(SFilterFields* fields, int32_t type, void *v) { int32_t filterGetFiledByDesc(SFilterFields* fields, int32_t type, void *v) {
for (uint32_t i = 0; i < fields->num; ++i) { for (uint32_t i = 0; i < fields->num; ++i) {
if (0 == gDescCompare[type](fields->fields[i].desc, v)) { if (nodesEqualNode(fields->fields[i].desc, v)) {
return i; return i;
} }
} }
...@@ -901,27 +880,25 @@ static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilte ...@@ -901,27 +880,25 @@ static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilte
} }
int32_t filterAddFieldFromNode(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) { int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *fid) {
if (node == NULL) { if (node == NULL) {
fltError("empty node"); fltError("empty node");
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
if (node->nodeType != TEXPR_BINARYEXPR_NODE && node->nodeType != TEXPR_VALUE_NODE) { if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE) {
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
int32_t type; int32_t type;
void *v; void *v;
if (node->nodeType == TEXPR_BINARYEXPR_NODE) { if (nodeType(node) == QUERY_NODE_COLUMN_REF) {
type = FLD_TYPE_COLUMN; type = FLD_TYPE_COLUMN;
v = node->pSchema; v = node;
node->pSchema = NULL;
} else { } else {
type = FLD_TYPE_VALUE; type = FLD_TYPE_VALUE;
v = node->pVal; v = node;
node->pVal = NULL;
} }
filterAddField(info, v, NULL, type, fid, 0, true); filterAddField(info, v, NULL, type, fid, 0, true);
...@@ -999,287 +976,36 @@ int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) { ...@@ -999,287 +976,36 @@ int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t filterConvertSetFromBinary(void **q, const char *buf, int32_t len, uint32_t tType, bool tolower) {
SBufferReader br = tbufInitReader(buf, len, false);
uint32_t sType = tbufReadUint32(&br);
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(tType), true, false);
int32_t code = 0;
taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(tType));
int dummy = -1;
SVariant tmpVar = {0};
size_t t = 0;
int32_t sz = tbufReadInt32(&br);
void *pvar = NULL;
int64_t val = 0;
int32_t bufLen = 0;
if (IS_NUMERIC_TYPE(sType)) {
bufLen = 60; // The maximum length of string that a number is converted to.
} else {
bufLen = 128;
}
char *tmp = calloc(1, bufLen * TSDB_NCHAR_SIZE);
for (int32_t i = 0; i < sz; i++) {
switch (sType) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
*(uint8_t *)&val = (uint8_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
*(uint16_t *)&val = (uint16_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
*(uint32_t *)&val = (uint32_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
*(uint64_t *)&val = (uint64_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double *)&val = tbufReadDouble(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(float *)&val = (float)tbufReadDouble(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_BINARY: {
pvar = (char *)tbufReadBinary(&br, &t);
break;
}
case TSDB_DATA_TYPE_NCHAR: {
pvar = (char *)tbufReadBinary(&br, &t);
break;
}
default:
taosHashCleanup(pObj);
*q = NULL;
assert(0);
}
taosVariantCreateFromBinary(&tmpVar, (char *)pvar, t, sType);
if (bufLen < t) {
tmp = realloc(tmp, t * TSDB_NCHAR_SIZE);
bufLen = (int32_t)t;
}
bool converted = false;
char extInfo = 0;
switch (tType) {
case TSDB_DATA_TYPE_BOOL:
if (sType != TSDB_DATA_TYPE_BOOL && !IS_SIGNED_NUMERIC_TYPE(sType)) {
goto _return;
}
if (tmpVar.i64 > 1 ||tmpVar.i64 < 0) {
goto _return;
}
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
if (tVariantDumpEx(&tmpVar, (char *)&val, tType, false, &converted, &extInfo)) {
if (converted) {
tVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
continue;
}
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
if (tVariantDumpEx(&tmpVar, (char *)&val, tType, false, &converted, &extInfo)) {
if (converted) {
tVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
continue;
}
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
if (tVariantDumpEx(&tmpVar, (char *)&val, tType, false, &converted, &extInfo)) {
if (converted) {
tVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
continue;
}
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
if (taosVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
if (taosVariantDumpEx(&tmpVar, (char *)&val, tType, false, &converted, &extInfo)) {
if (converted) {
tVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
continue;
}
goto _return;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_BINARY: {
if (taosVariantDump(&tmpVar, tmp, tType, true)) {
goto _return;
}
t = varDataLen(tmp);
pvar = varDataVal(tmp);
if (tolower) {
strntolower_s(pvar, pvar, (int32_t)t);
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
if (taosVariantDump(&tmpVar, tmp, tType, true)) {
goto _return;
}
t = varDataLen(tmp);
pvar = varDataVal(tmp);
break;
}
default:
goto _return;
}
taosHashPut(pObj, (char *)pvar, t, &dummy, sizeof(dummy));
taosVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
}
*q = (void *)pObj;
pObj = NULL;
_return:
taosVariantDestroy(&tmpVar);
taosHashCleanup(pObj);
tfree(tmp);
return code;
}
static int32_t filterDealJson(SFilterInfo *info, tExprNode* tree, tExprNode** pLeft) {
if((*pLeft)->nodeType == TSQL_NODE_EXPR && (*pLeft)->_node.optr == TSDB_RELATION_ARROW){ // json tag -> operation
assert(info->pTable != NULL);
SSchema* schema = (*pLeft)->_node.pLeft->pSchema;
if((*pLeft)->_node.pRight->pVal->nLen > TSDB_MAX_JSON_KEY_LEN) return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
jsonKeyMd5((*pLeft)->_node.pRight->pVal->pz, (*pLeft)->_node.pRight->pVal->nLen, keyMd5);
memcpy(schema->name, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
(*pLeft) = (*pLeft)->_node.pLeft; // -> operation use left as input
}else if(((*pLeft)->pSchema->type == TSDB_DATA_TYPE_JSON) &&
(tree->_node.optr == TSDB_RELATION_ISNULL || tree->_node.optr == TSDB_RELATION_NOTNULL)){
SSchema* schema = (*pLeft)->pSchema;
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
uint32_t nullData = TSDB_DATA_JSON_NULL;
jsonKeyMd5(&nullData, INT_BYTES, keyMd5);
memcpy(schema->name, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
}else if(tree->_node.optr == TSDB_RELATION_CONTAINS){
SSchema* schema = (*pLeft)->pSchema;
if(tree->_node.pRight->pVal->nLen > TSDB_MAX_JSON_KEY_LEN) return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
char keyMd5[TSDB_MAX_JSON_KEY_MD5_LEN] = {0};
jsonKeyMd5(tree->_node.pRight->pVal->pz, tree->_node.pRight->pVal->nLen, keyMd5);
memcpy(schema->name, keyMd5, TSDB_MAX_JSON_KEY_MD5_LEN);
}
return TSDB_CODE_SUCCESS;
}
int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) { int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
tExprNode* pLeft = tree->_node.pLeft; SOperatorNode *node = (SOperatorNode *)tree;
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if((ret = filterDealJson(info, tree, &pLeft)) != TSDB_CODE_SUCCESS) return ret;
SFilterFieldId left = {0}, right = {0}; SFilterFieldId left = {0}, right = {0};
filterAddFieldFromNode(info, pLeft, &left); filterAddFieldFromNode(info, node->pLeft, &left);
uint8_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(info, left)); uint8_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(info, left));
int32_t len = 0; int32_t len = 0;
uint32_t uidx = 0; uint32_t uidx = 0;
int32_t code = 0;
if (tree->_node.optr == TSDB_RELATION_IN && (!IS_VAR_DATA_TYPE(type))) { if (node->opType == OP_TYPE_IN && (!IS_VAR_DATA_TYPE(type))) {
void *data = NULL; SNodeListNode *listNode = (SNodeListNode *)node->pRight;
SVariant* var = tree->_node.pRight->pVal; void *fdata = NULL;
filterConvertSetFromBinary((void **)&data, var->pz, var->nLen, type, false); SListCell *cell = listNode->pNodeList->pHead;
if (data == NULL) { SScalarParam in = {.num = 1}, out = {.num = 1, .type = type};
fltError("failed to convert in param");
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); for (int32_t i = 0; i < listNode->pNodeList->length; ++i) {
} SValueNode *valueNode = (SValueNode *)cell->pNode;
in.type = valueNode->node.resType.type;
if (taosHashGetSize((SHashObj *)data) <= 0) { in.bytes = valueNode->node.resType.bytes;
filterAddUnit(info, FILTER_DUMMY_EMPTY_OPTR, &left, NULL, &uidx); in.data = nodesGetValueFromNode(valueNode);
out.data = malloc(sizeof(int64_t));
SFilterGroup fgroup = {0};
filterAddUnitToGroup(&fgroup, uidx); code = vectorConvertImpl(&in, &out);
if (code) {
fltError("convert from %d to %d failed", in.type, out.type);
tfree(out.data);
FLT_ERR_RET(code);
}
taosArrayPush(group, &fgroup);
taosHashCleanup(data);
return TSDB_CODE_SUCCESS;
}
void *p = taosHashIterate((SHashObj *)data, NULL);
while(p) {
void *key = taosHashGetDataKey((SHashObj *)data, p);
void *fdata = NULL;
fdata = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(fdata, key);
len = tDataTypes[type].bytes; len = tDataTypes[type].bytes;
filterAddField(info, NULL, &fdata, FLD_TYPE_VALUE, &right, len, true); filterAddField(info, NULL, &fdata, FLD_TYPE_VALUE, &right, len, true);
...@@ -1290,16 +1016,11 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) { ...@@ -1290,16 +1016,11 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
filterAddUnitToGroup(&fgroup, uidx); filterAddUnitToGroup(&fgroup, uidx);
taosArrayPush(group, &fgroup); taosArrayPush(group, &fgroup);
p = taosHashIterate((SHashObj *)data, p);
} }
taosHashCleanup(data);
} else { } else {
filterAddFieldFromNode(info, tree->_node.pRight, &right); filterAddFieldFromNode(info, node->pRight, &right);
ret = filterAddUnit(info, tree->_node.optr, &left, &right, &uidx); FLT_ERR_RET(filterAddUnit(info, node->opType, &left, &right, &uidx));
CHK_LRET(ret != TSDB_CODE_SUCCESS, TSDB_CODE_QRY_APP_ERROR, "invalid where condition");
SFilterGroup fgroup = {0}; SFilterGroup fgroup = {0};
filterAddUnitToGroup(&fgroup, uidx); filterAddUnitToGroup(&fgroup, uidx);
...@@ -1984,6 +1705,69 @@ int32_t filterHandleValueExtInfo(SFilterUnit* unit, char extInfo) { ...@@ -1984,6 +1705,69 @@ int32_t filterHandleValueExtInfo(SFilterUnit* unit, char extInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t fltGenerateSetFromList(void **data, void *pNode, uint32_t type) {
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
if (NULL == pObj) {
fltError("taosHashInit failed, size:%d", 256);
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type));
int32_t code = 0;
SNodeListNode *nodeList = (SNodeListNode *)pNode;
SListCell *cell = nodeList->pNodeList->pHead;
SScalarParam in = {.num = 1}, out = {.num = 1, .type = type};
int8_t dummy = 0;
int32_t bufLen = 60;
out.data = malloc(bufLen);
int32_t len = 0;
void *buf = NULL;
for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
SValueNode *valueNode = (SValueNode *)cell->pNode;
if (valueNode->node.resType.type != type) {
in.type = valueNode->node.resType.type;
in.bytes = valueNode->node.resType.bytes;
in.data = nodesGetValueFromNode(valueNode);
code = vectorConvertImpl(&in, &out);
if (code) {
fltError("convert from %d to %d failed", in.type, out.type);
FLT_ERR_JRET(code);
}
if (IS_VAR_DATA_TYPE(type)) {
len = varDataLen(out.data);
} else {
len = tDataTypes[type].bytes;
}
buf = out.data;
} else {
buf = nodesGetValueFromNode(valueNode);
len = valueNode->node.resType.bytes;
}
if (taosHashPut(pObj, buf, (size_t)len, &dummy, sizeof(dummy))) {
fltError("taosHashPut failed");
FLT_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
tfree(out.data);
*data = pObj;
return TSDB_CODE_SUCCESS;
_return:
tfree(out.data);
taosHashCleanup(pObj);
FLT_RET(code);
}
int32_t filterInitValFieldData(SFilterInfo *info) { int32_t filterInitValFieldData(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) { for (uint32_t i = 0; i < info->unitNum; ++i) {
...@@ -2008,6 +1792,7 @@ int32_t filterInitValFieldData(SFilterInfo *info) { ...@@ -2008,6 +1792,7 @@ int32_t filterInitValFieldData(SFilterInfo *info) {
} }
if (unit->compare.optr == TSDB_RELATION_IN) { if (unit->compare.optr == TSDB_RELATION_IN) {
FLT_ERR_RET(fltGenerateSetFromList((void **)&fi->data, fi->desc, type));
filterConvertSetFromBinary((void **)&fi->data, var->pz, var->nLen, type, false); filterConvertSetFromBinary((void **)&fi->data, var->pz, var->nLen, type, false);
if (fi->data == NULL) { if (fi->data == NULL) {
fltError("failed to convert in param"); fltError("failed to convert in param");
...@@ -2739,7 +2524,7 @@ int32_t filterGenerateColRange(SFilterInfo *info, SFilterGroupCtx** gRes, int32_ ...@@ -2739,7 +2524,7 @@ int32_t filterGenerateColRange(SFilterInfo *info, SFilterGroupCtx** gRes, int32_
if (info->colRange[m] == NULL) { if (info->colRange[m] == NULL) {
info->colRange[m] = filterInitRangeCtx(colInfo->dataType, 0); info->colRange[m] = filterInitRangeCtx(colInfo->dataType, 0);
SFilterField* fi = FILTER_GET_COL_FIELD(info, res->colIdx[n]); SFilterField* fi = FILTER_GET_COL_FIELD(info, res->colIdx[n]);
info->colRange[m]->colId = ((SSchema*)fi->desc)->colId; info->colRange[m]->colId = FILTER_GET_COL_FIELD_ID(fi);
} }
assert(colInfo->type == RANGE_TYPE_MR_CTX); assert(colInfo->type == RANGE_TYPE_MR_CTX);
...@@ -3483,9 +3268,8 @@ int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from ...@@ -3483,9 +3268,8 @@ int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) { for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i]; SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema* sch = fi->desc;
(*fp)(param, sch->colId, &fi->data); (*fp)(param, FILTER_GET_COL_FIELD_ID(fi), &fi->data);
} }
filterUpdateComUnits(info); filterUpdateComUnits(info);
...@@ -3493,25 +3277,6 @@ int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from ...@@ -3493,25 +3277,6 @@ int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t filterSetJsonColFieldData(SFilterInfo *info, void *param, filer_get_col_from_name fp) {
CHK_LRET(info == NULL, TSDB_CODE_QRY_APP_ERROR, "info NULL");
CHK_LRET(info->fields[FLD_TYPE_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds");
if (FILTER_ALL_RES(info) || FILTER_EMPTY_RES(info)) {
return TSDB_CODE_SUCCESS;
}
for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema* sch = fi->desc;
(*fp)(param, sch->colId, sch->name, &fi->data);
}
filterUpdateComUnits(info);
return TSDB_CODE_SUCCESS;
}
int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) { int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -3802,6 +3567,95 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) { ...@@ -3802,6 +3567,95 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) {
EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
SFltTreeStat *stat = (SFltTreeStat *)pContext; SFltTreeStat *stat = (SFltTreeStat *)pContext;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
SLogicConditionNode *node = (SLogicConditionNode *)*pNode;
SListCell *cell = node->pParameterList->pHead;
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
if (NULL == cell || NULL == cell->pNode) {
sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
stat->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
if ((QUERY_NODE_OPERATOR != nodeType(cell->pNode)) && (QUERY_NODE_LOGIC_CONDITION != nodeType(cell->pNode))) {
stat->scalarMode = true;
}
cell = cell->pNext;
}
return DEAL_RES_CONTINUE;
}
if (stat->scalarMode) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
SOperatorNode *node = (SOperatorNode *)*pNode;
if (!FLT_IS_COMPARISON_OPERATOR(node->opType)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (NULL == node->pRight) {
if (sclGetOperatorParamNum(node->opType) > 1) {
sclError("invalid operator, pRight:%d, type:%d", node->pRight, nodeType(node));
stat->code = TSDB_CODE_QRY_APP_ERROR;
return DEAL_RES_ERROR;
}
if (QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
} else {
if ((QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) && (QUERY_NODE_VALUE != nodeType(node->pLeft))) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if ((QUERY_NODE_COLUMN_REF != nodeType(node->pRight)) && (QUERY_NODE_VALUE != nodeType(node->pRight))) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (nodeType(node->pLeft) == nodeType(node->pRight)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) {
SNode *t = node->pLeft;
node->pLeft = node->pRight;
node->pRight = t;
}
if (OP_TYPE_IN == node->opType || QUERY_NODE_NODE_LIST != nodeType(node->pRight)) {
fltError("failed to convert in param");
stat->code = TSDB_CODE_QRY_APP_ERROR;
return DEAL_RES_ERROR;
}
}
return DEAL_RES_CONTINUE;
}
sclError("invalid node type for filter, type:%d", nodeType(*pNode));
stat->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
} }
int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) { int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
...@@ -3840,10 +3694,11 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) ...@@ -3840,10 +3694,11 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
if (!info->scalarMode) { if (!info->scalarMode) {
FLT_ERR_JRET(fltInitFromNode(pNode, info, options)); FLT_ERR_JRET(fltInitFromNode(pNode, info, options));
} else {
info->sclCtx.node = pNode;
FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat));
} }
FLT_ERR_JRET(fltOptimizeNodes(info, &pNode, &stat));
return code; return code;
_return: _return:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册