提交 8565cb4f 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

...@@ -53,7 +53,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm ...@@ -53,7 +53,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm
*/ */
int indexOpen(SIndexOpts *opt, const char *path, SIndex **index); int indexOpen(SIndexOpts *opt, const char *path, SIndex **index);
void indexClose(SIndex *index); void indexClose(SIndex *index);
int indexPut(SIndex *index, SIndexMultiTerm *terms, int uid); int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid);
int indexDelete(SIndex *index, SIndexMultiTermQuery *query); int indexDelete(SIndex *index, SIndexMultiTermQuery *query);
int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result); int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result);
int indexRebuild(SIndex *index, SIndexOpts *opt); int indexRebuild(SIndex *index, SIndexOpts *opt);
...@@ -71,7 +71,6 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms); ...@@ -71,7 +71,6 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms);
SIndexOpts *indexOptsCreate(); SIndexOpts *indexOptsCreate();
void indexOptsDestroy(SIndexOpts *opts); void indexOptsDestroy(SIndexOpts *opts);
/* /*
* @param: * @param:
* @param: * @param:
......
...@@ -31,6 +31,16 @@ ...@@ -31,6 +31,16 @@
extern "C" { extern "C" {
#endif #endif
typedef enum {kTypeValue, kTypeDeletion} STermValueType ;
typedef struct SIndexStat {
int32_t totalAdded; //
int32_t totalDeled; //
int32_t totalUpdated; //
int32_t totalTerms; //
int32_t distinctCol; // distinct column
} SIndexStat;
struct SIndex { struct SIndex {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t *index; index_t *index;
...@@ -42,6 +52,8 @@ struct SIndex { ...@@ -42,6 +52,8 @@ struct SIndex {
int64_t suid; // current super table id, -1 is normal table int64_t suid; // current super table id, -1 is normal table
int colId; // field id allocated to cache int colId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache int32_t cVersion; // current version allocated to cache
SIndexStat stat;
pthread_mutex_t mtx; pthread_mutex_t mtx;
}; };
...@@ -49,8 +61,12 @@ struct SIndexOpts { ...@@ -49,8 +61,12 @@ struct SIndexOpts {
#ifdef USE_LUCENE #ifdef USE_LUCENE
void *opts; void *opts;
#endif #endif
int32_t numOfItermLimit;
int8_t mergeInterval; #ifdef USE_INVERTED_INDEX
int32_t cacheSize; // MB
// add cache module later
#endif
}; };
struct SIndexMultiTermQuery { struct SIndexMultiTermQuery {
......
...@@ -16,13 +16,14 @@ ...@@ -16,13 +16,14 @@
#define __INDEX_CACHE_H__ #define __INDEX_CACHE_H__
#include "index.h" #include "index.h"
#include "indexInt.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tskiplist.h" #include "tskiplist.h"
// ----------------- row structure in skiplist --------------------- // ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below: /* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->| * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| * len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/ */
#ifdef __cplusplus #ifdef __cplusplus
...@@ -40,11 +41,10 @@ IndexCache *indexCacheCreate(); ...@@ -40,11 +41,10 @@ IndexCache *indexCacheCreate();
void indexCacheDestroy(void *cache); void indexCacheDestroy(void *cache);
int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid);
uint32_t version, uint64_t uid, int8_t operType);
int indexCacheGet(void *cache, uint64_t *rst); //int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result); int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_TFILE_H__
#define __INDEX_TFILE_H__
#include "index.h"
#include "indexInt.h"
#include "tlockfree.h"
#include "tskiplist.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct IndexTFile {
T_REF_DECLARE()
} IndexTFile;
IndexTFile *indexTFileCreate();
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
#ifdef __cplusplus
}
#endif
#endif
...@@ -16,12 +16,19 @@ ...@@ -16,12 +16,19 @@
#include "index.h" #include "index.h"
#include "indexInt.h" #include "indexInt.h"
#include "index_cache.h" #include "index_cache.h"
#include "index_tfile.h"
#include "tdef.h"
#ifdef USE_LUCENE #ifdef USE_LUCENE
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
static int uidCompare(const void *a, const void *b) {
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
if (u1 == u2) { return 0; }
else { return u1 < u2 ? -1 : 1; }
}
typedef struct SIdxColInfo { typedef struct SIdxColInfo {
int colId; // generated by index internal int colId; // generated by index internal
int cVersion; int cVersion;
...@@ -30,13 +37,13 @@ typedef struct SIdxColInfo { ...@@ -30,13 +37,13 @@ typedef struct SIdxColInfo {
static pthread_once_t isInit = PTHREAD_ONCE_INIT; static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit(); static void indexInit();
static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
if (sIdx == NULL) { static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result);
return -1; static int indexMergeCacheIntoTindex(SIndex *sIdx);
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); static void indexInterResultsDestroy(SArray *results);
return 0; static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult);
}
int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex *sIdx = calloc(1, sizeof(SIndex)); SIndex *sIdx = calloc(1, sizeof(SIndex));
...@@ -73,7 +80,7 @@ void indexClose(SIndex *sIdx) { ...@@ -73,7 +80,7 @@ void indexClose(SIndex *sIdx) {
return; return;
} }
int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_document_t *doc = index_document_create(); index_document_t *doc = index_document_create();
...@@ -115,7 +122,7 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { ...@@ -115,7 +122,7 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) {
assert(fi != NULL); assert(fi != NULL);
int32_t colId = fi->colId; int32_t colId = fi->colId;
int32_t version = index->cVersion; int32_t version = index->cVersion;
int ret = indexCachePut(index->cache, colId, p->colType, p->colVal, p->nColVal, version, uid, p->operType); int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
...@@ -162,12 +169,25 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result ...@@ -162,12 +169,25 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
#endif #endif
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray *interResults = taosArrayInit(4, POINTER_BYTES);
int nQuery = taosArrayGetSize(multiQuerys->query);
for (size_t i = 0; i < nQuery; i++) {
SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i);
SArray *tResult = NULL;
indexTermSearch(index, qTerm, &tResult);
taosArrayPush(interResults, (void *)&tResult);
}
indexMergeFinalResults(interResults, opera, result);
indexInterResultsDestroy(interResults);
#endif #endif
return 1; return 1;
} }
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
#endif #endif
...@@ -259,3 +279,75 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) { ...@@ -259,3 +279,75 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) {
void indexInit() { void indexInit() {
//do nothing //do nothing
} }
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) {
int32_t version = -1;
int16_t colId = -1;
SIdxColInfo *colInfo = NULL;
SIndexTerm *term = query->term;
const char *colName = term->colName;
int32_t nColName = term->nColName;
pthread_mutex_lock(&sIdx->mtx);
colInfo = taosHashGet(sIdx->colObj, colName, nColName);
if (colInfo == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
colId = colInfo->colId;
version = colInfo->cVersion;
pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
//TODO: iterator mem and tidex
STermValueType s;
if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName);
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
}
}
} else {
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
return -1;
}
return 0;
}
static void indexInterResultsDestroy(SArray *results) {
if (results == NULL) { return; }
size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) {
SArray *p = taosArrayGetP(results, i);
taosArrayDestroy(p);
}
taosArrayDestroy(results);
}
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) {
//refactor, merge interResults into fResults by oType
SArray *first = taosArrayGetP(interResults, 0);
taosArraySort(first, uidCompare);
if (oType == MUST) {
} else if (oType == SHOULD) {
// tag1 condistion || tag2 condition
} else if (oType == NOT) {
// not use currently
}
return 0;
}
static int indexMergeCacheIntoTindex(SIndex *sIdx) {
if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#define MAX_INDEX_KEY_LEN 256// test only, change later #define MAX_INDEX_KEY_LEN 256// test only, change later
// ref index_cache.h:22
#define CACHE_KEY_LEN(p) (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
static char* getIndexKey(const void *pData) { static char* getIndexKey(const void *pData) {
return NULL; return NULL;
} }
...@@ -43,7 +46,7 @@ static int32_t compareKey(const void *l, const void *r) { ...@@ -43,7 +46,7 @@ static int32_t compareKey(const void *l, const void *r) {
rp += sizeof(rf); rp += sizeof(rf);
// compare field type // compare field type
int16_t lft, rft; int8_t lft, rft;
memcpy(&lft, lp, sizeof(lft)); memcpy(&lft, lp, sizeof(lft));
memcpy(&rft, rp, sizeof(rft)); memcpy(&rft, rp, sizeof(rft));
lp += sizeof(lft); lp += sizeof(lft);
...@@ -103,14 +106,13 @@ void indexCacheDestroy(void *cache) { ...@@ -103,14 +106,13 @@ void indexCacheDestroy(void *cache) {
free(pCache); free(pCache);
} }
int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) {
uint32_t version, uint64_t uid, int8_t operType) {
if (cache == NULL) { return -1;} if (cache == NULL) { return -1;}
IndexCache *pCache = cache; IndexCache *pCache = cache;
// encode data // encode data
int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType); int32_t total = CACHE_KEY_LEN(term);
char *buf = calloc(1, total); char *buf = calloc(1, total);
char *p = buf; char *p = buf;
...@@ -118,16 +120,16 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f ...@@ -118,16 +120,16 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f
memcpy(p, &total, sizeof(total)); memcpy(p, &total, sizeof(total));
p += sizeof(total); p += sizeof(total);
memcpy(p, &fieldId, sizeof(fieldId)); memcpy(p, &colId, sizeof(colId));
p += sizeof(fieldId); p += sizeof(colId);
memcpy(p, &fieldType, sizeof(fieldType)); memcpy(p, &term->colType, sizeof(term->colType));
p += sizeof(fieldType); p += sizeof(term->colType);
memcpy(p, &fvLen, sizeof(fvLen)); memcpy(p, &term->nColVal, sizeof(term->nColVal));
p += sizeof(fvLen); p += sizeof(term->nColVal);
memcpy(p, fieldValue, fvLen); memcpy(p, term->colVal, term->nColVal);
p += fvLen; p += term->nColVal;
memcpy(p, &version, sizeof(version)); memcpy(p, &version, sizeof(version));
p += sizeof(version); p += sizeof(version);
...@@ -135,10 +137,11 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f ...@@ -135,10 +137,11 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f
memcpy(p, &uid, sizeof(uid)); memcpy(p, &uid, sizeof(uid));
p += sizeof(uid); p += sizeof(uid);
memcpy(p, &operType, sizeof(operType)); memcpy(p, &term->operType, sizeof(term->operType));
p += sizeof(operType); p += sizeof(term->operType);
tSkipListPut(pCache->skiplist, (void *)buf); tSkipListPut(pCache->skiplist, (void *)buf);
return 0;
// encode end // encode end
} }
...@@ -146,7 +149,25 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t ...@@ -146,7 +149,25 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t
IndexCache *pCache = cache; IndexCache *pCache = cache;
return 0; return 0;
} }
int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result) { int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) {
if (cache == NULL) { return -1; }
IndexCache *pCache = cache;
SIndexTerm *term = query->term;
EIndexQueryType qtype = query->qType;
int32_t keyLen = CACHE_KEY_LEN(term);
char *buf = calloc(1, keyLen);
if (qtype == QUERY_TERM) {
} else if (qtype == QUERY_PREFIX) {
} else if (qtype == QUERY_SUFFIX) {
} else if (qtype == QUERY_REGEX) {
}
return 0; return 0;
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index_tfile.h"
IndexTFile *indexTFileCreate() {
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
return tfile;
}
void IndexTFileDestroy(IndexTFile *tfile) {
free(tfile);
}
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
IndexTFile *ptfile = (IndexTFile *)tfile;
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册