提交 64ffbc1e 编写于 作者: dengyihao's avatar dengyihao

enh: support tag filter

上级 da4ab248
......@@ -194,6 +194,7 @@ void indexInit();
/* index filter */
typedef struct SIndexMetaArg {
void* metaHandle;
void* metaEx;
uint64_t suid;
} SIndexMetaArg;
......
......@@ -80,6 +80,18 @@ int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
typedef struct SMetaFltParam {
tb_uid_t suid;
int16_t cid;
int16_t type;
char * val;
bool reverse;
int (*filterFunc)(void *a, void *b, int16_t type);
} SMetaFltParam;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
#if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg;
......
......@@ -16,8 +16,8 @@
#ifndef _TD_VNODE_META_H_
#define _TD_VNODE_META_H_
#include "vnodeInt.h"
#include "index.h"
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
......@@ -45,8 +45,6 @@ int32_t metaULock(SMeta* pMeta);
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
// metaTable ==================
// metaQuery ==================
int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid);
......@@ -118,6 +116,10 @@ typedef struct {
int64_t smaUid;
} SSmaIdxKey;
// metaTable ==================
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int8_t type, tb_uid_t uid,
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
#ifndef META_REFACT
// SMetaDB
int metaOpenDB(SMeta* pMeta);
......
......@@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
}
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
SMeta *pMeta = pReader->pMeta;
SMeta * pMeta = pReader->pMeta;
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
......@@ -54,7 +54,7 @@ _err:
}
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta *pMeta = pReader->pMeta;
SMeta * pMeta = pReader->pMeta;
int64_t version;
// query uid.idx
......@@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
}
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
SMeta *pMeta = pReader->pMeta;
SMeta * pMeta = pReader->pMeta;
tb_uid_t uid;
// query name.idx
......@@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
}
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
tb_uid_t uid = 0;
......@@ -134,7 +134,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
int metaTbCursorNext(SMTbCursor *pTbCur) {
int ret;
void *pBuf;
void * pBuf;
STbCfg tbCfg;
for (;;) {
......@@ -155,7 +155,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
int64_t version;
SSchemaWrapper schema = {0};
......@@ -204,11 +204,11 @@ _err:
}
struct SMCtbCursor {
SMeta *pMeta;
TBC *pCur;
SMeta * pMeta;
TBC * pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
void * pKey;
void * pVal;
int kLen;
int vLen;
};
......@@ -280,10 +280,10 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t quid;
SMetaReader mr = {0};
STSchema *pTSchema = NULL;
STSchema * pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0};
SSchema *pSchema;
SSchema * pSchema;
metaReaderInit(&mr, pMeta, 0);
metaGetTableEntryByUid(&mr, uid);
......@@ -320,11 +320,11 @@ int metaGetTbNum(SMeta *pMeta) {
}
typedef struct {
SMeta *pMeta;
TBC *pCur;
SMeta * pMeta;
TBC * pCur;
tb_uid_t uid;
void *pKey;
void *pVal;
void * pKey;
void * pVal;
int kLen;
int vLen;
} SMSmaCursor;
......@@ -396,7 +396,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL;
SArray *pSmaIds = NULL;
SArray * pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL;
......@@ -420,7 +420,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
STSma * pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
......@@ -468,7 +468,7 @@ _err:
}
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma *pTSma = NULL;
STSma * pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
......@@ -490,7 +490,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
}
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
SArray *pUids = NULL;
SArray * pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
......@@ -528,7 +528,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
}
SArray *metaGetSmaTbUids(SMeta *pMeta) {
SArray *pUids = NULL;
SArray * pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0;
......@@ -575,4 +575,79 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) {
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid);
}
\ No newline at end of file
}
typedef struct {
SMeta * pMeta;
TBC * pCur;
tb_uid_t suid;
int16_t cid;
int16_t type;
void * pKey;
void * pVal;
int32_t kLen;
int32_t vLen;
} SIdxCursor;
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
SIdxCursor *pCursor = NULL;
char *tagData = param->val;
int32_t ret = 0, valid = 0;
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
pCursor->cid = param->cid;
pCursor->type = param->type;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
if (ret < 0) {
goto END;
}
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, pCursor->type,
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
if (ret != 0) {
goto END;
}
int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
goto END;
}
void * entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
while (1) {
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) {
break;
}
STagIdxKey *p = entryKey;
if (p != NULL) {
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
if (cmp == 0) {
// match
tb_uid_t tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
} else {
// not match and no more result
break;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
taosMemoryFree(pCursor);
return ret;
}
......@@ -721,8 +721,8 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
}
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
int32_t nTagData = 0;
if (pTagData) {
......
......@@ -4495,8 +4495,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
......@@ -4510,9 +4510,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else {
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
tableIdList, pTableScanNode, pTaskInfo, &twSup, pTableScanNode->tsColId);
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode,
pTaskInfo, &twSup, pTableScanNode->tsColId);
taosArrayDestroy(tableIdList);
return pOperator;
......@@ -4912,7 +4912,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
if (tableType == TSDB_SUPER_TABLE) {
if (pTagCond) {
SIndexMetaArg metaArg = {.metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
SIndexMetaArg metaArg = {.metaEx = metaHandle, .metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res);
......
......@@ -12,6 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC vnode
PUBLIC nodes
PUBLIC scalar
PUBLIC function
......
......@@ -33,8 +33,9 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
__compar_fn_t indexGetCompar(int8_t type);
TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty);
......
......@@ -80,7 +80,7 @@ static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static void indexInterResultsDestroy(SArray* results);
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
static int indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
......@@ -386,21 +386,21 @@ static void indexInterResultsDestroy(SArray* results) {
taosArrayDestroy(results);
}
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
static int indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
// refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(interResults); i--) {
SArray* t = taosArrayGetP(interResults, i);
for (int i = 0; i < taosArrayGetSize(in); i--) {
SArray* t = taosArrayGetP(in, i);
taosArraySort(t, uidCompare);
taosArrayRemoveDuplicate(t, uidCompare, NULL);
}
if (oType == MUST) {
iIntersection(interResults, fResults);
iIntersection(in, out);
} else if (oType == SHOULD) {
iUnion(interResults, fResults);
iUnion(in, out);
} else if (oType == NOT) {
// just one column index, enhance later
taosArrayAddAll(fResults, interResults);
// taosArrayAddAll(fResults, interResults);
// not use currently
}
return 0;
......
......@@ -75,7 +75,7 @@ char* indexInt2str(int64_t val, char* dst, int radix) {
;
return dst - 1;
}
static __compar_fn_t indexGetCompar(int8_t type) {
__compar_fn_t indexGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
return (__compar_fn_t)strcmp;
}
......@@ -182,6 +182,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
case QUERY_TERM: {
if (ret == 0) return MATCH;
}
}
return CONTINUE;
}
......
......@@ -14,11 +14,13 @@
*/
#include "index.h"
#include "indexComm.h"
#include "indexInt.h"
#include "nodes.h"
#include "querynodes.h"
#include "scalar.h"
#include "tdatablock.h"
#include "vnode.h"
// clang-format off
#define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
......@@ -259,10 +261,52 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
indexError("index-filter not support buildin function");
return TSDB_CODE_QRY_INVALID_INPUT;
}
typedef int (*Filter)(void *a, void *b, int16_t dtype);
int sifGreaterThan(void *a, void *b, int16_t dtype) {
__compar_fn_t func = indexGetCompar(dtype);
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
}
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
__compar_fn_t func = indexGetCompar(dtype);
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
}
int sifLessEqual(void *a, void *b, int16_t dtype) {
__compar_fn_t func = indexGetCompar(dtype);
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
}
int sifLessThan(void *a, void *b, int16_t dtype) {
__compar_fn_t func = indexGetCompar(dtype);
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
}
int sifEqual(void *a, void *b, int16_t dtype) {
__compar_fn_t func = indexGetCompar(dtype);
return (int)tDoCompare(func, QUERY_TERM, a, b);
}
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
*reverse = true;
} else {
*reverse = false;
}
if (type == QUERY_LESS_EQUAL)
return sifLessEqual;
else if (type == QUERY_LESS_THAN)
return sifLessThan;
else if (type == QUERY_GREATER_EQUAL)
return sifGreaterEqual;
else if (type == QUERY_GREATER_THAN)
return sifGreaterThan;
else if (type == QUERY_TERM) {
return sifEqual;
}
return NULL;
}
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
#ifdef USE_INVERTED_INDEX
SIndexMetaArg *arg = &output->arg;
SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
#ifdef USE_INVERTED_INDEX
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
right->condValue, strlen(right->condValue));
if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
......@@ -278,8 +322,22 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
indexMultiTermQueryDestroy(mtm);
return ret;
#else
return 0;
EIndexQueryType qtype = 0;
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
bool reverse;
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
SMetaFltParam param = {.suid = arg->suid,
.cid = left->colId,
.type = left->colValType,
.val = right->condValue,
.reverse = reverse,
.filterFunc = filterFunc};
int ret = metaFilteTableIds(arg->metaEx, &param, output->result);
return ret;
#endif
return 0;
}
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
......
......@@ -24,8 +24,8 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexPut(index, terms, uid);
// handle put
return indexPut(index, terms, uid);
}
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
......@@ -34,11 +34,11 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexSearch(index, tq, result);
// handle search
return indexSearch(index, tq, result);
}
void tIndexJsonClose(SIndexJson *index) {
return indexClose(index);
// handle close
return indexClose(index);
}
......@@ -16,7 +16,7 @@
#include "tdbInt.h"
struct STTB {
TDB *pEnv;
TDB * pEnv;
SBTree *pBt;
};
......@@ -25,11 +25,11 @@ struct STBC {
};
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
TTB *pTb;
TTB * pTb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
SPage *pPage;
SPage * pPage;
SPgno pgno;
*ppTb = NULL;
......@@ -145,4 +145,4 @@ int tdbTbcClose(TBC *pTbc) {
return 0;
}
int tdbTbcIsValid(TBC *pTbc) { return tdbBtcIsValid(&pTbc->btc); }
\ No newline at end of file
int tdbTbcIsValid(TBC *pTbc) { return tdbBtcIsValid(&pTbc->btc); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册