未验证 提交 4d99b003 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12168 from taosdata/enh/supportIdxFilter

enh(index): support more data type
......@@ -56,7 +56,8 @@ typedef enum {
QUERY_LESS_EQUAL,
QUERY_GREATER_THAN,
QUERY_GREATER_EQUAL,
QUERY_RANGE
QUERY_RANGE,
QUERY_MAX
} EIndexQueryType;
/*
......@@ -178,8 +179,8 @@ void indexOptsDestroy(SIndexOpts* opts);
* @param:
*/
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, int8_t qType, uint8_t colType,
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal);
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal);
void indexTermDestroy(SIndexTerm* p);
/*
......
......@@ -252,8 +252,8 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
return TSDB_CODE_QRY_INVALID_INPUT;
}
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName,
strlen(left->colName), right->condValue, strlen(right->condValue));
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
right->condValue, strlen(right->condValue));
if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......
......@@ -31,6 +31,7 @@ extern "C" {
typedef struct MemTable {
T_REF_DECLARE()
SSkipList* mem;
void* pCache;
} MemTable;
typedef struct IndexCache {
T_REF_DECLARE()
......@@ -47,6 +48,7 @@ typedef struct IndexCache {
} IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
typedef struct CacheTerm {
// key
char* colVal;
......
......@@ -24,7 +24,7 @@ extern char JSON_COLUMN[];
extern char JSON_VALUE_DELIM;
char* indexPackJsonData(SIndexTerm* itm);
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
#ifdef __cplusplus
}
#endif
......
......@@ -166,7 +166,9 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
} while (0)
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \
uint8_t oldTy = ty; \
......
......@@ -244,8 +244,8 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
return 0;
}
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType,
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
if (tm == NULL) {
return NULL;
......
......@@ -34,44 +34,96 @@ static char* indexCacheTermGet(const void* pData);
static MemTable* indexInternalCacheCreate(int8_t type);
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s,
RangeType type);
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
RangeType type);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
int32_t ret = func(a, b);
switch (comType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
}
case QUERY_GREATER_THAN: {
if (ret > 0) return MATCH;
break;
}
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
}
return CONTINUE;
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan,
cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange};
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
{cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
cacheSearchRange_JSON}};
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
if (cache == NULL) {
return 0;
}
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
MemTable* mem = cache;
char* key = indexCacheTermGet(ct);
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
char* key = indexCacheTermGet(pCt);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
......@@ -80,7 +132,7 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, ct->colVal)) {
if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
......@@ -92,30 +144,39 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S
break;
}
}
taosMemoryFree(pCt);
tSkipListDestroyIter(iter);
return 0;
}
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
RangeType type) {
if (cache == NULL) {
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
MemTable* mem = cache;
char* key = indexCacheTermGet(ct);
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
char* key = indexCacheTermGet(pCt);
SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
while (tSkipListIterNext(iter)) {
......@@ -124,7 +185,7 @@ static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType);
TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
if (cond == MATCH) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
......@@ -134,26 +195,153 @@ static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else if (cond == CONTINUE) {
continue;
} else if (cond == BREAK) {
break;
}
}
taosMemoryFree(pCt);
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, LT);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, LE);
}
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, term, tr, s, GE);
}
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
if (cache == NULL) {
return 0;
}
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
char* exBuf = NULL;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
exBuf = indexPackJsonData(term);
pCt->colVal = exBuf;
}
char* key = indexCacheTermGet(pCt);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node == NULL) {
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, pCt->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else {
break;
}
}
taosMemoryFree(pCt);
taosMemoryFree(exBuf);
tSkipListDestroyIter(iter);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, LE);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
}
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, GT);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, GE);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
}
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
RangeType type) {
if (cache == NULL) {
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
int skip = 0;
char* exBuf = NULL;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
exBuf = indexPackJsonDataPrefix(term, &skip);
pCt->colVal = exBuf;
}
char* key = indexCacheTermGet(pCt);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node == NULL) {
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
if (cond == MATCH) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else if (cond == CONTINUE) {
continue;
} else if (cond == BREAK) {
break;
}
}
taosMemoryFree(pCt);
taosMemoryFree(exBuf);
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
......@@ -167,6 +355,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
};
cache->mem = indexInternalCacheCreate(type);
cache->mem->pCache = cache;
cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
cache->type = type;
cache->index = idx;
......@@ -325,6 +514,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
indexCacheRef(cache);
cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type);
cache->mem->pCache = cache;
cache->occupiedMem = 0;
// sched to merge
// unref cache in bgwork
......@@ -379,11 +569,19 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
return 0;
}
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) {
if (mem == NULL) {
return 0;
}
return cacheSearch[qtype](mem, ct, tr, s);
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
return cacheSearch[1][qtype](mem, term, tr, s);
} else {
return cacheSearch[0][qtype](mem, term, tr, s);
}
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
int64_t st = taosGetTimestampUs();
......@@ -400,25 +598,24 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
indexMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx);
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
// SIndexTerm* term = query->term;
// EIndexQueryType qtype = query->qType;
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
char* p = term->colVal;
if (hasJson) {
p = indexPackJsonData(term);
}
CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
// bool isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
// char* p = term->colVal;
// if (isJson) {
// p = indexPackJsonData(term);
//}
// CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
int ret = indexQueryMem(mem, &ct, qtype, result, s);
int ret = indexQueryMem(mem, query, result, s);
if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm
ret = indexQueryMem(imm, &ct, qtype, result, s);
}
if (hasJson) {
taosMemoryFreeClear(p);
ret = indexQueryMem(imm, query, result, s);
}
// if (isJson) {
// taosMemoryFreeClear(p);
//}
indexMemUnRef(mem);
indexMemUnRef(imm);
......
......@@ -46,3 +46,30 @@ char* indexPackJsonData(SIndexTerm* itm) {
return buf;
}
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
char* buf = (char*)taosMemoryCalloc(1, sz);
char* p = buf;
memcpy(p, itm->colName, itm->nColName);
p += itm->nColName;
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
p += sizeof(JSON_VALUE_DELIM);
memcpy(p, &ty, sizeof(ty));
p += sizeof(ty);
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
p += sizeof(JSON_VALUE_DELIM);
*skip = p - buf;
return buf;
}
......@@ -308,20 +308,20 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
}
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
uint64_t sz = tem->nColVal;
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
int skip = 0;
if (hasJson) {
p = indexPackJsonData(tem);
sz = strlen(p);
p = indexPackJsonDataPrefix(tem, &skip);
}
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
FstSlice h = fstSliceCreate((uint8_t*)p, sz);
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
fstStreamBuilderSetRange(sb, &h, type);
fstSliceDestroy(&h);
......@@ -606,16 +606,16 @@ static bool tfileIteratorNext(Iterate* iiter) {
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = taosMemoryCalloc(1, sizeof(TFileFstIter));
if (tIter == NULL) {
TFileFstIter* iter = taosMemoryCalloc(1, sizeof(TFileFstIter));
if (iter == NULL) {
return NULL;
}
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
tIter->fb = fstSearch(reader->fst, tIter->ctx);
tIter->st = streamBuilderIntoStream(tIter->fb);
tIter->rdr = reader;
return tIter;
iter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
iter->fb = fstSearch(reader->fst, iter->ctx);
iter->st = streamBuilderIntoStream(iter->fb);
iter->rdr = reader;
return iter;
}
Iterate* tfileIteratorCreate(TFileReader* reader) {
......
......@@ -483,7 +483,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
std::string colName("voltage");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {term, QUERY_TERM};
......@@ -557,7 +557,7 @@ TEST_F(IndexCacheEnv, cache_test) {
std::string colName("voltage");
{
std::string colVal("v1");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
......@@ -565,28 +565,28 @@ TEST_F(IndexCacheEnv, cache_test) {
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
......@@ -595,14 +595,14 @@ TEST_F(IndexCacheEnv, cache_test) {
std::cout << "--------first----------" << std::endl;
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
}
{
std::string colVal("v4");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, othColId, version++, suid++);
indexTermDestroy(term);
......@@ -613,7 +613,7 @@ TEST_F(IndexCacheEnv, cache_test) {
std::string colVal("v4");
for (size_t i = 0; i < 10; i++) {
colVal[colVal.size() - 1] = 'a' + i;
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++);
indexTermDestroy(term);
......@@ -623,7 +623,7 @@ TEST_F(IndexCacheEnv, cache_test) {
// begin query
{
std::string colVal("v3");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {term, QUERY_TERM};
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
......@@ -638,7 +638,7 @@ TEST_F(IndexCacheEnv, cache_test) {
}
{
std::string colVal("v2");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexTermQuery query = {term, QUERY_TERM};
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
......@@ -670,7 +670,7 @@ class IndexObj {
return ret;
}
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -679,7 +679,7 @@ class IndexObj {
}
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -701,7 +701,7 @@ class IndexObj {
// opt
tColVal[taosRand() % colValSize] = 'a' + k % 26;
}
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
tColVal.c_str(), tColVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -737,7 +737,7 @@ class IndexObj {
int SearchOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
......@@ -759,7 +759,7 @@ class IndexObj {
}
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
......@@ -784,7 +784,7 @@ class IndexObj {
void PutOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
Put(terms, 10);
......@@ -792,7 +792,7 @@ class IndexObj {
}
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
Put(terms, val);
......@@ -832,7 +832,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
{
std::string colName("tag1"), colVal("Hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -847,7 +847,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
size_t size = 200;
std::string colName("tag1"), colVal("hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -862,7 +862,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
size_t size = 200;
std::string colName("tag1"), colVal("Hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -877,7 +877,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
{
std::string colName("tag1"), colVal("Hello");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
......
......@@ -40,7 +40,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -53,7 +53,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("voltage");
std::string colVal("ab1");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -66,7 +66,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("voltage");
std::string colVal("123");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -81,7 +81,7 @@ TEST_F(JsonEnv, testWrite) {
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
......@@ -95,7 +95,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -110,7 +110,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
std::string colVal("abxxxxxxxxxxxx");
for (int i = 0; i < 1000; i++) {
colVal[i % colVal.size()] = '0' + i % 128;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -124,7 +124,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
{
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -139,7 +139,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
......
......@@ -784,6 +784,11 @@ static void uvDestroyConn(uv_handle_t* handle) {
tDebug("server conn %p destroy", conn);
// uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs);
if (conn->regArg.init == 1) {
transFreeMsg(conn->regArg.msg.pCont);
conn->regArg.init = 0;
}
QUEUE_REMOVE(&conn->queue);
taosMemoryFree(conn->pTcp);
taosMemoryFree(conn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册