diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 3ca8d106033795f50a5090619395f02da55c2ede..2535ec8a5b31f460dc19befb8274652a75942112 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -53,7 +53,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm */ int indexOpen(SIndexOpts *opt, const char *path, SIndex **index); void indexClose(SIndex *index); -int indexPut(SIndex *index, SIndexMultiTerm *terms, int uid); +int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid); int indexDelete(SIndex *index, SIndexMultiTermQuery *query); int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result); int indexRebuild(SIndex *index, SIndexOpts *opt); @@ -71,7 +71,6 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms); SIndexOpts *indexOptsCreate(); void indexOptsDestroy(SIndexOpts *opts); - /* * @param: * @param: diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 7e017049e849c898a7c61bd390a005a3742ed10e..a258cb834dcbe4a15845c690b706d46b09674c78 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -31,6 +31,16 @@ extern "C" { #endif +typedef enum {kTypeValue, kTypeDeletion} STermValueType ; + +typedef struct SIndexStat { + int32_t totalAdded; // + int32_t totalDeled; // + int32_t totalUpdated; // + int32_t totalTerms; // + int32_t distinctCol; // distinct column +} SIndexStat; + struct SIndex { #ifdef USE_LUCENE index_t *index; @@ -42,15 +52,21 @@ struct SIndex { int64_t suid; // current super table id, -1 is normal table int colId; // field id allocated to cache int32_t cVersion; // current version allocated to cache + + SIndexStat stat; pthread_mutex_t mtx; }; struct SIndexOpts { #ifdef USE_LUCENE void *opts; -#endif - int32_t numOfItermLimit; - int8_t mergeInterval; +#endif + +#ifdef USE_INVERTED_INDEX + int32_t cacheSize; // MB + // add cache module later +#endif + }; struct SIndexMultiTermQuery { diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index b952e16a8e0929d61595f1d833e045b261acfa74..97a7b835f6a5afb348847276ba427a11b2113c83 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -16,13 +16,14 @@ #define __INDEX_CACHE_H__ #include "index.h" +#include "indexInt.h" #include "tlockfree.h" #include "tskiplist.h" -// ----------------- row structure in skiplist --------------------- +// ----------------- key structure in skiplist --------------------- /* A data row, the format is like below: * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| + * len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| */ #ifdef __cplusplus @@ -40,11 +41,10 @@ IndexCache *indexCacheCreate(); void indexCacheDestroy(void *cache); -int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, - uint32_t version, uint64_t uid, int8_t operType); +int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid); -int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result); +//int indexCacheGet(void *cache, uint64_t *rst); +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s); #ifdef __cplusplus } diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h new file mode 100644 index 0000000000000000000000000000000000000000..c3f4bd25e5e6a792120f1ac8880bfbc57915dfef --- /dev/null +++ b/source/libs/index/inc/index_tfile.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __INDEX_TFILE_H__ +#define __INDEX_TFILE_H__ + +#include "index.h" +#include "indexInt.h" +#include "tlockfree.h" +#include "tskiplist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct IndexTFile { + T_REF_DECLARE() +} IndexTFile; + + + +IndexTFile *indexTFileCreate(); + + +int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result); + +#ifdef __cplusplus +} + + +#endif + + + +#endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 08c59d8d4379abde6d252ecebe516b27eb62f870..ec83e84a3b22541fb1ba7906551d94a906917ea0 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,12 +16,19 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_tfile.h" +#include "tdef.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif - +static int uidCompare(const void *a, const void *b) { + uint64_t u1 = *(uint64_t *)a; + uint64_t u2 = *(uint64_t *)b; + if (u1 == u2) { return 0; } + else { return u1 < u2 ? -1 : 1; } +} typedef struct SIdxColInfo { int colId; // generated by index internal int cVersion; @@ -30,13 +37,13 @@ typedef struct SIdxColInfo { static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); -static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { - if (sIdx == NULL) { - return -1; - } - indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); - return 0; -} + +static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result); +static int indexMergeCacheIntoTindex(SIndex *sIdx); + +static void indexInterResultsDestroy(SArray *results); +static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult); + int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { pthread_once(&isInit, indexInit); SIndex *sIdx = calloc(1, sizeof(SIndex)); @@ -49,8 +56,8 @@ int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { sIdx->cache = (void*)indexCacheCreate(); sIdx->tindex = NULL; - sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - sIdx->colId = 1; + sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + sIdx->colId = 1; sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); @@ -73,7 +80,7 @@ void indexClose(SIndex *sIdx) { return; } -int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { +int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) { #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -115,7 +122,7 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { assert(fi != NULL); int32_t colId = fi->colId; int32_t version = index->cVersion; - int ret = indexCachePut(index->cache, colId, p->colType, p->colVal, p->nColVal, version, uid, p->operType); + int ret = indexCachePut(index->cache, p, colId, version, uid); if (ret != 0) { return ret; } @@ -162,12 +169,25 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result #endif #ifdef USE_INVERTED_INDEX + EIndexOperatorType opera = multiQuerys->opera; // relation of querys + SArray *interResults = taosArrayInit(4, POINTER_BYTES); + int nQuery = taosArrayGetSize(multiQuerys->query); + for (size_t i = 0; i < nQuery; i++) { + SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i); + SArray *tResult = NULL; + indexTermSearch(index, qTerm, &tResult); + taosArrayPush(interResults, (void *)&tResult); + } + indexMergeFinalResults(interResults, opera, result); + indexInterResultsDestroy(interResults); + #endif return 1; } + int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { #ifdef USE_INVERTED_INDEX #endif @@ -259,3 +279,75 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) { void indexInit() { //do nothing } +static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) { + int32_t version = -1; + int16_t colId = -1; + SIdxColInfo *colInfo = NULL; + + SIndexTerm *term = query->term; + const char *colName = term->colName; + int32_t nColName = term->nColName; + + pthread_mutex_lock(&sIdx->mtx); + colInfo = taosHashGet(sIdx->colObj, colName, nColName); + if (colInfo == NULL) { + pthread_mutex_unlock(&sIdx->mtx); + return -1; + } + colId = colInfo->colId; + version = colInfo->cVersion; + pthread_mutex_unlock(&sIdx->mtx); + + *result = taosArrayInit(4, sizeof(uint64_t)); + //TODO: iterator mem and tidex + STermValueType s; + if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) { + if (s == kTypeDeletion) { + indexInfo("col: %s already drop by other opera", term->colName); + // coloum already drop by other oper, no need to query tindex + return 0; + } else { + if (0 != indexTFileSearch(sIdx->tindex, query, *result)) { + indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); + return -1; + } + } + } else { + indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); + return -1; + } + return 0; +} +static void indexInterResultsDestroy(SArray *results) { + if (results == NULL) { return; } + + size_t sz = taosArrayGetSize(results); + for (size_t i = 0; i < sz; i++) { + SArray *p = taosArrayGetP(results, i); + taosArrayDestroy(p); + } + taosArrayDestroy(results); + +} +static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) { + //refactor, merge interResults into fResults by oType + SArray *first = taosArrayGetP(interResults, 0); + taosArraySort(first, uidCompare); + if (oType == MUST) { + + } else if (oType == SHOULD) { + + // tag1 condistion || tag2 condition + } else if (oType == NOT) { + + // not use currently + } + return 0; +} +static int indexMergeCacheIntoTindex(SIndex *sIdx) { + if (sIdx == NULL) { + return -1; + } + indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + return 0; +} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 23f7a088231186d599b4d850512ac8302a89d9ac..3c52275a4c96f2f0f12f6fae012c9681f2a37c99 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -18,6 +18,9 @@ #define MAX_INDEX_KEY_LEN 256// test only, change later +// ref index_cache.h:22 +#define CACHE_KEY_LEN(p) (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) + static char* getIndexKey(const void *pData) { return NULL; } @@ -43,7 +46,7 @@ static int32_t compareKey(const void *l, const void *r) { rp += sizeof(rf); // compare field type - int16_t lft, rft; + int8_t lft, rft; memcpy(&lft, lp, sizeof(lft)); memcpy(&rft, rp, sizeof(rft)); lp += sizeof(lft); @@ -103,14 +106,13 @@ void indexCacheDestroy(void *cache) { free(pCache); } -int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, - uint32_t version, uint64_t uid, int8_t operType) { +int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) { if (cache == NULL) { return -1;} IndexCache *pCache = cache; // encode data - int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType); + int32_t total = CACHE_KEY_LEN(term); char *buf = calloc(1, total); char *p = buf; @@ -118,16 +120,16 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f memcpy(p, &total, sizeof(total)); p += sizeof(total); - memcpy(p, &fieldId, sizeof(fieldId)); - p += sizeof(fieldId); + memcpy(p, &colId, sizeof(colId)); + p += sizeof(colId); - memcpy(p, &fieldType, sizeof(fieldType)); - p += sizeof(fieldType); + memcpy(p, &term->colType, sizeof(term->colType)); + p += sizeof(term->colType); - memcpy(p, &fvLen, sizeof(fvLen)); - p += sizeof(fvLen); - memcpy(p, fieldValue, fvLen); - p += fvLen; + memcpy(p, &term->nColVal, sizeof(term->nColVal)); + p += sizeof(term->nColVal); + memcpy(p, term->colVal, term->nColVal); + p += term->nColVal; memcpy(p, &version, sizeof(version)); p += sizeof(version); @@ -135,10 +137,11 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f memcpy(p, &uid, sizeof(uid)); p += sizeof(uid); - memcpy(p, &operType, sizeof(operType)); - p += sizeof(operType); + memcpy(p, &term->operType, sizeof(term->operType)); + p += sizeof(term->operType); tSkipListPut(pCache->skiplist, (void *)buf); + return 0; // encode end } @@ -146,7 +149,25 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t IndexCache *pCache = cache; return 0; } -int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result) { - - return 0; +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) { + if (cache == NULL) { return -1; } + IndexCache *pCache = cache; + SIndexTerm *term = query->term; + EIndexQueryType qtype = query->qType; + + int32_t keyLen = CACHE_KEY_LEN(term); + + char *buf = calloc(1, keyLen); + if (qtype == QUERY_TERM) { + + } else if (qtype == QUERY_PREFIX) { + + } else if (qtype == QUERY_SUFFIX) { + + } else if (qtype == QUERY_REGEX) { + + } + + return 0; } + diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c new file mode 100644 index 0000000000000000000000000000000000000000..a1bba56391ce445899fe1fc23b8e422c8cf3cd8f --- /dev/null +++ b/source/libs/index/src/index_tfile.c @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index_tfile.h" + +IndexTFile *indexTFileCreate() { + IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); + return tfile; +} +void IndexTFileDestroy(IndexTFile *tfile) { + free(tfile); +} +int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { + IndexTFile *ptfile = (IndexTFile *)tfile; + return 0; +} + + +