提交 d6021230 编写于 作者: W wpan

improve performance

上级 9a64c0ee
......@@ -31,6 +31,7 @@ extern "C" {
#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2
#define FILTER_DUMMY_EMPTY_OPTR 127
#define FILTER_DUMMY_RANGE_OPTR 126
#define MAX_NUM_STR_SIZE 40
......@@ -96,6 +97,8 @@ typedef struct SFilterColRange {
} SFilterColRange;
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
typedef bool(*filter_exec_func)(void *, int32_t, int8_t*);
typedef struct SFilterRangeCompare {
int64_t s;
......@@ -178,36 +181,52 @@ typedef struct SFilterColCtx {
} SFilterColCtx;
typedef struct SFilterCompare {
__compar_fn_t pCompareFunc;
int32_t type;
uint8_t optr;
uint8_t optr2;
} SFilterCompare;
typedef struct SFilterUnit {
SFilterCompare compare;
SFilterFieldId left;
SFilterFieldId right;
SFilterFieldId right2;
} SFilterUnit;
typedef struct SFilterComUnit {
__compar_fn_t func;
rangeCompFunc rfunc;
void *colData;
void *valData;
void *valData2;
int32_t dataType;
uint16_t dataSize;
uint16_t optr;
} SFilterComUnit;
typedef struct SFilterPCtx {
SHashObj *valHash;
SHashObj *unitHash;
} SFilterPCtx;
typedef struct SFilterInfo {
uint32_t options;
uint32_t status;
uint16_t unitSize;
uint16_t unitNum;
uint16_t groupNum;
uint16_t colRangeNum;
SFilterFields fields[FLD_TYPE_MAX];
SFilterGroup *groups;
SFilterUnit *units;
uint8_t *unitRes; // result
uint8_t *unitFlags; // got result
uint32_t options;
uint32_t status;
uint16_t unitSize;
uint16_t unitNum;
uint16_t groupNum;
uint16_t colRangeNum;
SFilterFields fields[FLD_TYPE_MAX];
SFilterGroup *groups;
uint16_t *cgroups;
SFilterUnit *units;
SFilterComUnit *cunits;
uint8_t *unitRes; // result
uint8_t *unitFlags; // got result
SFilterRangeCtx **colRange;
SFilterPCtx pctx;
filter_exec_func func;
SFilterPCtx pctx;
} SFilterInfo;
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
......@@ -264,9 +283,11 @@ typedef struct SFilterInfo {
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
#define FILTER_UNIT_CLR_F(i) memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags))
#define FILTER_UNIT_SET_F(i, idx) (i)->unitFlags[idx] = 1
......@@ -282,7 +303,9 @@ typedef struct SFilterInfo {
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL)
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
......
......@@ -114,7 +114,40 @@ rangeCompFunc filterGetRangeCompFunc(char sflag, char eflag) {
return filterRangeCompii;
}
rangeCompFunc filterGetRangeCompFuncFromOptrs(uint8_t optr, uint8_t optr2) {
if (optr2) {
assert(optr2 == TSDB_RELATION_LESS || optr2 == TSDB_RELATION_LESS_EQUAL);
if (optr == TSDB_RELATION_GREATER) {
if (optr2 == TSDB_RELATION_LESS) {
return filterRangeCompee;
}
return filterRangeCompei;
}
if (optr2 == TSDB_RELATION_LESS) {
return filterRangeCompie;
}
return filterRangeCompii;
} else {
switch (optr) {
case TSDB_RELATION_GREATER:
return filterRangeCompGe;
case TSDB_RELATION_GREATER_EQUAL:
return filterRangeCompGi;
case TSDB_RELATION_LESS:
return filterRangeCompLe;
case TSDB_RELATION_LESS_EQUAL:
return filterRangeCompLi;
default:
break;
}
}
return NULL;
}
static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void *pRight) {
SFilterGroupCtx *left = *((SFilterGroupCtx**)pLeft), *right = *((SFilterGroupCtx**)pRight);
......@@ -123,7 +156,6 @@ static FORCE_INLINE int32_t filterCompareGroupCtx(const void *pLeft, const void
return 0;
}
int32_t filterInitUnitsFields(SFilterInfo *info) {
info->unitSize = FILTER_DEFAULT_UNIT_SIZE;
info->units = calloc(info->unitSize, sizeof(SFilterUnit));
......@@ -1128,9 +1160,18 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright, uidx);
}
int32_t filterAddUnitRight(SFilterInfo *info, uint8_t optr, SFilterFieldId *right, uint16_t uidx) {
SFilterUnit *u = &info->units[uidx];
u->compare.optr2 = optr;
u->right2 = *right;
return TSDB_CODE_SUCCESS;
}
int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRangeCtx *ctx, uint16_t cidx, SFilterGroup *g, int32_t optr, SArray *res) {
SFilterFieldId left, right;
SFilterFieldId left, right, right2;
uint16_t uidx = 0;
SFilterField *col = FILTER_GET_COL_FIELD(src, cidx);
......@@ -1174,6 +1215,18 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
} else {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
void *data2 = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data2, &ra->e);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true);
filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RANGE_FLG_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx);
filterAddUnitRight(dst, FILTER_GET_FLAG(ra->eflag, RANGE_FLG_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &right2, uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
}
}
......@@ -1237,12 +1290,24 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
} else {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes, true);
void *data2 = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data2, &r->ra.e);
filterAddField(dst, NULL, &data2, FLD_TYPE_VALUE, &right2, tDataTypes[type].bytes, true);
taosArrayPush(res, g);
r = r->next;
continue;
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx);
filterAddUnitRight(dst, FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &right2, uidx);
filterAddUnitToGroup(g, uidx);
}
taosArrayPush(res, g);
r = r->next;
continue;
}
if (!FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) {
......@@ -1330,7 +1395,8 @@ _return:
int32_t filterInitUnitFunc(SFilterInfo *info) {
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
unit->compare.pCompareFunc = getComparFunc(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr);
info->cunits[i].func = getComparFunc(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr);
}
return TSDB_CODE_SUCCESS;
......@@ -1524,6 +1590,8 @@ void filterFreePCtx(SFilterPCtx *pctx) {
void filterFreeInfo(SFilterInfo *info) {
CHK_RETV(info == NULL);
tfree(info->cunits);
for (int32_t i = 0; i < FLD_TYPE_MAX; ++i) {
for (uint16_t f = 0; f < info->fields[i].num; ++f) {
filterFreeField(&info->fields[i].fields[f], i);
......@@ -1649,10 +1717,10 @@ int32_t filterInitValFieldData(SFilterInfo *info) {
}
bool filterDoCompare(SFilterUnit *unit, void *left, void *right) {
int32_t ret = unit->compare.pCompareFunc(left, right);
bool filterDoCompare(__compar_fn_t func, uint8_t optr, void *left, void *right) {
int32_t ret = func(left, right);
switch (unit->compare.optr) {
switch (optr) {
case TSDB_RELATION_EQUAL: {
return ret == 0;
}
......@@ -2330,6 +2398,241 @@ int32_t filterPostProcessRange(SFilterInfo *info) {
}
int32_t filterGenerateComInfo(SFilterInfo *info) {
uint16_t n = 0;
info->cunits = malloc(info->unitNum * sizeof(*info->cunits));
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit *unit = &info->units[i];
info->cunits[i].func = getComparFunc(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr);
info->cunits[i].rfunc = filterGetRangeCompFuncFromOptrs(unit->compare.optr, unit->compare.optr2);
info->cunits[i].optr = FILTER_UNIT_OPTR(unit);
info->cunits[i].colData = NULL;
if (unit->right.type == FLD_TYPE_VALUE) {
info->cunits[i].valData = FILTER_UNIT_VAL_DATA(info, unit);
} else {
info->cunits[i].valData = NULL;
}
if (unit->right2.type == FLD_TYPE_VALUE) {
info->cunits[i].valData2 = FILTER_GET_VAL_FIELD_DATA(FILTER_GET_FIELD(info, unit->right2));
} else {
info->cunits[i].valData2 = info->cunits[i].valData;
}
info->cunits[i].dataSize = FILTER_UNIT_COL_SIZE(info, unit);
info->cunits[i].dataType = FILTER_UNIT_DATA_TYPE(unit);
}
uint16_t cgroupNum = info->groupNum + 1;
for (uint16_t i = 0; i < info->groupNum; ++i) {
cgroupNum += info->groups[i].unitNum;
}
info->cgroups = malloc(cgroupNum * sizeof(*info->cgroups));
for (uint16_t i = 0; i < info->groupNum; ++i) {
info->cgroups[n++] = info->groups[i].unitNum;
for (uint16_t m = 0; m < info->groups[i].unitNum; ++m) {
info->cgroups[n++] = info->groups[i].unitIdxs[m];
}
}
info->cgroups[n] = 0;
return TSDB_CODE_SUCCESS;
}
int32_t filterUpdateComUnits(SFilterInfo *info) {
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit *unit = &info->units[i];
info->cunits[i].colData = FILTER_UNIT_COL_DATA(info, unit, 0);
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, int8_t* p) {
return true;
}
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, int8_t* p) {
return false;
}
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, int8_t* p) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
uint16_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
p[i] = isNull(colData, info->cunits[uidx].dataType);
if (p[i] == 0) {
all = false;
}
}
return all;
}
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, int8_t* p) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
uint16_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
p[i] = !isNull(colData, info->cunits[uidx].dataType);
if (p[i] == 0) {
all = false;
}
}
return all;
}
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t* p) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
uint16_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
if (isNull(colData, info->cunits[uidx].dataType)) {
all = false;
continue;
}
p[i] = (*info->cunits[uidx].rfunc)(colData, colData, info->cunits[uidx].valData, info->cunits[uidx].valData2, info->cunits[uidx].func);
if (p[i] == 0) {
all = false;
}
}
return all;
}
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t* p) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
uint16_t uidx = info->groups[0].unitIdxs[0];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
if (isNull(colData, info->cunits[uidx].dataType)) {
all = false;
continue;
}
p[i] = filterDoCompare(info->cunits[uidx].func, info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
if (p[i] == 0) {
all = false;
}
}
return all;
}
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t* p) {
SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true;
for (int32_t i = 0; i < numOfRows; ++i) {
//FILTER_UNIT_CLR_F(info);
for (uint32_t g = 0; g < info->groupNum; ++g) {
for (uint32_t u = 0; u < info->groups[g].unitNum; ++u) {
uint16_t uidx = info->groups[g].unitIdxs[u];
void *colData = (char *)info->cunits[uidx].colData + info->cunits[uidx].dataSize * i;
//if (FILTER_UNIT_GET_F(info, uidx)) {
// p[i] = FILTER_UNIT_GET_R(info, uidx);
//} else {
uint8_t optr = info->cunits[uidx].optr;
if (isNull(colData, info->cunits[uidx].dataType)) {
p[i] = optr == TSDB_RELATION_ISNULL ? true : false;
} else {
if (optr == TSDB_RELATION_NOTNULL) {
p[i] = 1;
} else if (optr == TSDB_RELATION_ISNULL) {
p[i] = 0;
} else if (info->cunits[uidx].rfunc) {
p[i] = (*info->cunits[uidx].rfunc)(colData, colData, info->cunits[uidx].valData, info->cunits[uidx].valData2, info->cunits[uidx].func);
} else {
p[i] = filterDoCompare(info->cunits[uidx].func, info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
}
//FILTER_UNIT_SET_R(info, uidx, p[i]);
//FILTER_UNIT_SET_F(info, uidx);
}
if (p[i] == 0) {
break;
}
}
if (p[i]) {
break;
}
}
if (p[i] == 0) {
all = false;
}
}
return all;
}
FORCE_INLINE bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
return (*info->func)(info, numOfRows, p);
}
int32_t filterSetExecFunc(SFilterInfo *info) {
if (FILTER_ALL_RES(info)) {
info->func = filterExecuteImplAll;
return TSDB_CODE_SUCCESS;
}
if (FILTER_EMPTY_RES(info)) {
info->func = filterExecuteImplEmpty;
return TSDB_CODE_SUCCESS;
}
if (info->unitNum > 1) {
info->func = filterExecuteImpl;
return TSDB_CODE_SUCCESS;
}
if (info->units[0].compare.optr == TSDB_RELATION_ISNULL) {
info->func = filterExecuteImplIsNull;
return TSDB_CODE_SUCCESS;
}
if (info->units[0].compare.optr == TSDB_RELATION_NOTNULL) {
info->func = filterExecuteImplNotNull;
return TSDB_CODE_SUCCESS;
}
if (info->cunits[0].rfunc) {
info->func = filterExecuteImplRange;
return TSDB_CODE_SUCCESS;
}
info->func = filterExecuteImplMisc;
return TSDB_CODE_SUCCESS;
}
int32_t filterPreprocess(SFilterInfo *info) {
SFilterGroupCtx** gRes = calloc(info->groupNum, sizeof(SFilterGroupCtx *));
int32_t gResNum = 0;
......@@ -2357,8 +2660,12 @@ int32_t filterPreprocess(SFilterInfo *info) {
filterRewrite(info, gRes, gResNum);
filterGenerateComInfo(info);
_return:
filterSetExecFunc(info);
for (int32_t i = 0; i < gResNum; ++i) {
filterFreeGroupCtx(gRes[i]);
}
......@@ -2368,83 +2675,27 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) {
CHK_LRET(info == NULL, TSDB_CODE_QRY_APP_ERROR, "info NULL");
CHK_LRET(info->fields[FLD_TYPE_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds");
if (FILTER_ALL_RES(info) || FILTER_EMPTY_RES(info)) {
return TSDB_CODE_SUCCESS;
}
for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema* sch = fi->desc;
if (sch->colId == colId) {
fi->data = data;
break;
}
}
return TSDB_CODE_SUCCESS;
}
bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
bool all = true;
if (FILTER_GET_FLAG(info->status, FI_STATUS_EMPTY)) {
return false;
}
for (int32_t i = 0; i < numOfRows; ++i) {
FILTER_UNIT_CLR_F(info);
filterUpdateComUnits(info);
p[i] = 0;
for (uint16_t g = 0; g < info->groupNum; ++g) {
SFilterGroup* group = &info->groups[g];
bool qualified = true;
for (uint16_t u = 0; u < group->unitNum; ++u) {
uint16_t uidx = group->unitIdxs[u];
uint8_t ures = 0;
if (FILTER_UNIT_GET_F(info, uidx)) {
ures = FILTER_UNIT_GET_R(info, uidx);
} else {
SFilterUnit *unit = &info->units[uidx];
if (isNull(FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_DATA_TYPE(unit))) {
ures = unit->compare.optr == TSDB_RELATION_ISNULL ? true : false;
} else {
if (unit->compare.optr == TSDB_RELATION_NOTNULL) {
ures = true;
} else if (unit->compare.optr == TSDB_RELATION_ISNULL) {
ures = false;
} else {
ures = filterDoCompare(unit, FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_VAL_DATA(info, unit));
}
}
FILTER_UNIT_SET_R(info, uidx, ures);
FILTER_UNIT_SET_F(info, uidx);
}
if (!ures) {
qualified = ures;
break;
}
}
if (qualified) {
p[i] = 1;
break;
}
}
if (p[i] != 1) {
all = false;
}
}
return all;
return TSDB_CODE_SUCCESS;
}
......@@ -2485,9 +2736,9 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t option
taosArrayDestroy(group);
return code;
}
}
ERR_JRET(filterInitUnitFunc(info));
ERR_JRET(filterInitUnitFunc(info));
}
info->unitRes = malloc(info->unitNum * sizeof(*info->unitRes));
info->unitFlags = malloc(info->unitNum * sizeof(*info->unitFlags));
......@@ -2510,19 +2761,17 @@ _return:
return code;
}
FORCE_INLINE bool filterIsEmptyRes(SFilterInfo *info) {
if (info == NULL) {
return false;
}
return FILTER_GET_FLAG(info->status, FI_STATUS_EMPTY);
}
bool filterRangeExecute(SFilterInfo *info, SDataStatis *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (filterIsEmptyRes(info)) {
if (FILTER_EMPTY_RES(info)) {
return false;
}
if (FILTER_ALL_RES(info)) {
return true;
}
bool ret = true;
void *minVal, *maxVal;
......@@ -2710,6 +2959,10 @@ int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar
}
}
if (*gotNchar) {
filterUpdateComUnits(info);
}
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册