未验证 提交 c108dd5d 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10379 from taosdata/feature/qnode

Feature/qnode
......@@ -68,8 +68,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
......
......@@ -37,8 +37,8 @@ typedef struct SFilterColumnParam{
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "scalar.h"
#include "querynodes.h"
#include "query.h"
#include "tep.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
......@@ -199,7 +200,7 @@ typedef struct SFilterUnit {
} SFilterUnit;
typedef struct SFilterComUnit {
void *colData;
void *colData; // pointer to SColumnInfoData
void *valData;
void *valData2;
uint16_t colId;
......@@ -282,7 +283,7 @@ typedef struct SFilterInfo {
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_NULL)
#define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_UNKNOWN)
#define fltFatal(...) qFatal(__VA_ARGS__)
#define fltError(...) qError(__VA_ARGS__)
......@@ -305,7 +306,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnRefNode *)((fi)->desc))->columnId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnRefNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnRefNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnRefNode *)((fi)->desc))->dataType.bytes * (ri))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) (colDataGet(((SColumnInfoData *)(fi)->data), (ri)))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
......
......@@ -914,7 +914,7 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *f
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE) {
if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE && nodeType(node) != QUERY_NODE_NODE_LIST) {
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -968,8 +968,10 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
SFilterField *val = FILTER_UNIT_RIGHT_FIELD(info, u);
assert(FILTER_GET_FLAG(val->flag, FLD_TYPE_VALUE));
} else {
if(optr != OP_TYPE_IS_NULL && optr != OP_TYPE_IS_NOT_NULL && optr != FILTER_DUMMY_EMPTY_OPTR){
return -1;
int32_t paramNum = scalarGetOperatorParamNum(optr);
if (1 != paramNum) {
fltError("invalid right field in unit, operator:%s, rightType:%d", gOptrStr[optr].str, u->right.type);
return TSDB_CODE_QRY_APP_ERROR;
}
}
......@@ -1016,7 +1018,6 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
if (node->opType == OP_TYPE_IN && (!IS_VAR_DATA_TYPE(type))) {
SNodeListNode *listNode = (SNodeListNode *)node->pRight;
void *fdata = NULL;
SListCell *cell = listNode->pNodeList->pHead;
SScalarParam in = {.num = 1}, out = {.num = 1, .type = type};
......@@ -1036,7 +1037,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
len = tDataTypes[type].bytes;
filterAddField(info, NULL, &fdata, FLD_TYPE_VALUE, &right, len, true);
filterAddField(info, NULL, &out.data, FLD_TYPE_VALUE, &right, len, true);
filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx);
......@@ -1337,6 +1338,8 @@ EDealRes fltTreeToGroup(SNode* pNode, void* pContext) {
return DEAL_RES_IGNORE_CHILD;
}
ctx->code = TSDB_CODE_QRY_APP_ERROR;
fltError("invalid condition type, type:%d", node->condType);
return DEAL_RES_ERROR;
......@@ -1736,7 +1739,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
if (unit->right.type != FLD_TYPE_VALUE) {
assert(unit->compare.optr == OP_TYPE_IS_NULL || unit->compare.optr == OP_TYPE_IS_NOT_NULL || unit->compare.optr == FILTER_DUMMY_EMPTY_OPTR);
assert(unit->compare.optr == FILTER_DUMMY_EMPTY_OPTR || scalarGetOperatorParamNum(unit->compare.optr) == 1);
continue;
}
......@@ -1792,13 +1795,15 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
}
if(type != TSDB_DATA_TYPE_JSON){
bool converted = false;
char extInfo = 0;
SScalarParam in = {.data = nodesGetValueFromNode(var), .num = 1, .type = dType->type, .bytes = dType->bytes};
SScalarParam out = {.data = fi->data, .num = 1, .type = type};
if (vectorConvertImpl(&in, &out)) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
if (dType->type == type) {
assignVal(fi->data, nodesGetValueFromNode(var), dType->bytes, type);
} else {
SScalarParam in = {.data = nodesGetValueFromNode(var), .num = 1, .type = dType->type, .bytes = dType->bytes};
SScalarParam out = {.data = fi->data, .num = 1, .type = type};
if (vectorConvertImpl(&in, &out)) {
qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
}
......@@ -1809,7 +1814,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
int32_t len = taosUcs4ToMbs(varDataVal(fi->data), varDataLen(fi->data), varDataVal(newValData));
if (len < 0){
qError("filterInitValFieldData taosUcs4ToMbs error 1");
return TSDB_CODE_FAILED;
return TSDB_CODE_QRY_APP_ERROR;
}
varDataSetLen(newValData, len);
varDataCopy(fi->data, newValData);
......@@ -2565,7 +2570,8 @@ int32_t filterUpdateComUnits(SFilterInfo *info) {
for (uint32_t i = 0; i < info->unitNum; ++i) {
SFilterUnit *unit = &info->units[i];
info->cunits[i].colData = FILTER_UNIT_COL_DATA(info, unit, 0);
SFilterField *col = FILTER_UNIT_LEFT_FIELD(info, unit);
info->cunits[i].colData = col->data;
}
return TSDB_CODE_SUCCESS;
......@@ -2770,7 +2776,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
uint32_t unitNum = *(unitIdx++);
for (uint32_t u = 0; u < unitNum; ++u) {
SFilterComUnit *cunit = &info->cunits[*(unitIdx + u)];
void *colData = (char *)cunit->colData + cunit->dataSize * i;
void *colData = colDataGet((SColumnInfoData *)cunit->colData, i);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
......@@ -2868,7 +2874,7 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData){ // for json->'key' is null
(*p)[i] = 1;
......@@ -2902,7 +2908,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_JSON){
if (!colData) { // for json->'key' is not null
......@@ -2929,7 +2935,6 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
uint16_t dataSize = info->cunits[0].dataSize;
char *colData = (char *)info->cunits[0].colData;
rangeCompFunc rfunc = gRangeCompare[info->cunits[0].rfunc];
void *valData = info->cunits[0].valData;
void *valData2 = info->cunits[0].valData2;
......@@ -2943,10 +2948,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
*p = calloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) {
for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGet((SColumnInfoData *)info->cunits[0].colData, i);
if (colData == NULL || isNull(colData, info->cunits[0].dataType)) {
all = false;
colData += dataSize;
continue;
}
......@@ -2955,8 +2961,6 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
if ((*p)[i] == 0) {
all = false;
}
colData += dataSize;
}
return all;
......@@ -2976,7 +2980,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
void *colData = colDataGet((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || isNull(colData, info->cunits[uidx].dataType)) {
(*p)[i] = 0;
all = false;
......@@ -3027,7 +3031,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
for (uint32_t u = 0; u < group->unitNum; ++u) {
uint32_t uidx = group->unitIdxs[u];
SFilterComUnit *cunit = &info->cunits[uidx];
void *colData = (char *)cunit->colData + cunit->dataSize * i;
void *colData = colDataGet((SColumnInfoData *)(cunit->colData), i);
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
......@@ -3483,10 +3487,6 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
if (stat->scalarMode) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode) || QUERY_NODE_COLUMN_REF == nodeType(*pNode)) {
return DEAL_RES_CONTINUE;
}
......@@ -3505,7 +3505,7 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
if (NULL == node->pRight) {
if (scalarGetOperatorParamNum(node->opType) > 1) {
fltError("invalid operator, pRight:%p, type:%d", node->pRight, nodeType(node));
fltError("invalid operator, pRight:%p, nodeType:%d, opType:%d", node->pRight, nodeType(node), node->opType);
stat->code = TSDB_CODE_QRY_APP_ERROR;
return DEAL_RES_ERROR;
}
......@@ -3514,6 +3514,12 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if (OP_TYPE_IS_TRUE == node->opType || OP_TYPE_IS_FALSE == node->opType || OP_TYPE_IS_UNKNOWN == node->opType
|| OP_TYPE_IS_NOT_TRUE == node->opType || OP_TYPE_IS_NOT_FALSE == node->opType || OP_TYPE_IS_NOT_UNKNOWN == node->opType) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
} else {
if ((QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) && (QUERY_NODE_VALUE != nodeType(node->pLeft))) {
stat->scalarMode = true;
......@@ -3570,18 +3576,19 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
}
int32_t fltOptimizeNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t filterGetDataFromColId(void *param, int32_t id, void **data) {
int32_t fltGetDataFromColId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SFilterColumnParam *)param)->numOfCols;
SArray* pDataBlock = ((SFilterColumnParam *)param)->pDataBlock;
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, j);
if (id == pColInfo->info.colId) {
*data = pColInfo->pData;
*data = pColInfo;
break;
}
}
......@@ -3589,13 +3596,28 @@ int32_t filterGetDataFromColId(void *param, int32_t id, void **data) {
return TSDB_CODE_SUCCESS;
}
int32_t fltGetDataFromSlotId(void *param, int32_t id, void **data) {
int32_t numOfCols = ((SFilterColumnParam *)param)->numOfCols;
SArray* pDataBlock = ((SFilterColumnParam *)param)->pDataBlock;
if (id < 0 || id >= numOfCols || id >= taosArrayGetSize(pDataBlock)) {
fltError("invalid slot id, id:%d, numOfCols:%d, arraySize:%d", id, numOfCols, (int32_t)taosArrayGetSize(pDataBlock));
return TSDB_CODE_QRY_APP_ERROR;
}
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, id);
*data = pColInfo;
return TSDB_CODE_SUCCESS;
}
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, false);
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) {
return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false);
}
int32_t filterSetDataFromColId(SFilterInfo *info, void *param, filer_get_col_from_id fp) {
return fltSetColFieldDataImpl(info, param, fp, true);
int32_t filterSetDataFromColId(SFilterInfo *info, void *param) {
return fltSetColFieldDataImpl(info, param, fltGetDataFromColId, true);
}
......@@ -3623,6 +3645,8 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
SFltTreeStat stat = {0};
FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat));
info->scalarMode = stat.scalarMode;
if (!info->scalarMode) {
FLT_ERR_JRET(fltInitFromNode(pNode, info, options));
} else {
......@@ -3647,7 +3671,15 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pSrc, &output));
*p = output.data;
return TSDB_CODE_SUCCESS;
int8_t *r = output.data;
for (int32_t i = 0; i < output.num; ++i) {
if (0 == *(r+i)) {
return false;
}
}
return true;
}
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册