提交 6c844c75 编写于 作者: dengyihao's avatar dengyihao

enh(index): support numberic and json filter

上级 03c3c41d
......@@ -20,11 +20,23 @@
extern "C" {
#endif
#include "indexInt.h"
#include "tcompare.h"
extern char JSON_COLUMN[];
extern char JSON_VALUE_DELIM;
char* indexPackJsonData(SIndexTerm* itm);
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty);
#ifdef __cplusplus
}
#endif
......
......@@ -60,50 +60,6 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
RangeType type);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
}
case QUERY_GREATER_THAN: {
if (ret > 0) return MATCH;
break;
}
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
}
return CONTINUE;
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
......@@ -169,7 +125,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
_cache_range_compare cmpFn = indexGetCompare(type);
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
......@@ -295,7 +251,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
if (cache == NULL) {
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
_cache_range_compare cmpFn = indexGetCompare(type);
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
......
......@@ -13,12 +13,58 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexComm.h"
#include "index.h"
#include "indexInt.h"
#include "tcompare.h"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
}
case QUERY_GREATER_THAN: {
if (ret > 0) return MATCH;
break;
}
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
}
return CONTINUE;
}
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; }
char* indexPackJsonData(SIndexTerm* itm) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
......@@ -46,6 +92,7 @@ char* indexPackJsonData(SIndexTerm* itm) {
return buf;
}
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
......
......@@ -72,9 +72,23 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
{tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
{tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
......@@ -202,14 +216,10 @@ void tfileReaderDestroy(TFileReader* reader) {
taosMemoryFree(reader);
}
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
uint64_t sz = tem->nColVal;
if (hasJson) {
p = indexPackJsonData(tem);
sz = strlen(p);
}
int64_t st = taosGetTimestampUs();
FstSlice key = fstSliceCreate(p, sz);
uint64_t offset;
......@@ -224,9 +234,6 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
}
if (hasJson) {
taosMemoryFree(p);
}
fstSliceDestroy(&key);
return 0;
}
......@@ -308,14 +315,11 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
}
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
int skip = 0;
int ret = 0;
char* p = tem->colVal;
int skip = 0;
_cache_range_compare cmpFn = indexGetCompare(type);
if (hasJson) {
p = indexPackJsonDataPrefix(tem, &skip);
}
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
......@@ -328,7 +332,16 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
taosArrayPush(offsets, &(rt->out.out));
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
} else if (BREAK == cond) {
swsResultDestroy(rt);
break;
}
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
......@@ -376,17 +389,105 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy(&key);
return 0;
}
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
int ret = 0;
char* p = indexPackJsonData(tem);
int sz = strlen(p);
int64_t st = taosGetTimestampUs();
FstSlice key = fstSliceCreate(p, sz);
uint64_t offset;
if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
int64_t et = taosGetTimestampUs();
int64_t cost = et - st;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, cost);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
}
fstSliceDestroy(&key);
return 0;
// deprecate api
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
}
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
}
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
}
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
}
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) {
int ret = 0;
int skip = 0;
char* p = indexPackJsonDataPrefix(tem, &skip);
_cache_range_compare cmpFn = indexGetCompare(ctype);
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
fstStreamBuilderSetRange(sb, &h, ctype);
fstSliceDestroy(&h);
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
} else if (BREAK == cond) {
swsResultDestroy(rt);
break;
}
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
fstStreamBuilderDestroy(sb);
return TSDB_CODE_SUCCESS;
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
term->colVal);
return -1;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
return tfSearch[1][qtype](reader, term, tr);
} else {
return tfSearch[qtype](reader, term, tr);
return tfSearch[0][qtype](reader, term, tr);
}
tfileReaderUnRef(reader);
return 0;
}
......
......@@ -1070,7 +1070,7 @@ void transSendResponse(const STransMsg* msg) {
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Normal;
tTrace("server conn %p start to send resp (1/2)", exh->handle);
tDebug("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册