未验证 提交 0b579e58 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9196 from taosdata/feature/index_cache

Feature/index cache
......@@ -53,7 +53,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm
*/
int indexOpen(SIndexOpts *opt, const char *path, SIndex **index);
void indexClose(SIndex *index);
int indexPut(SIndex *index, SIndexMultiTerm *terms, int uid);
int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid);
int indexDelete(SIndex *index, SIndexMultiTermQuery *query);
int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result);
int indexRebuild(SIndex *index, SIndexOpts *opt);
......@@ -71,7 +71,6 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms);
SIndexOpts *indexOptsCreate();
void indexOptsDestroy(SIndexOpts *opts);
/*
* @param:
* @param:
......
......@@ -31,6 +31,16 @@
extern "C" {
#endif
typedef enum {kTypeValue, kTypeDeletion} STermValueType ;
typedef struct SIndexStat {
int32_t totalAdded; //
int32_t totalDeled; //
int32_t totalUpdated; //
int32_t totalTerms; //
int32_t distinctCol; // distinct column
} SIndexStat;
struct SIndex {
#ifdef USE_LUCENE
index_t *index;
......@@ -42,15 +52,21 @@ struct SIndex {
int64_t suid; // current super table id, -1 is normal table
int colId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache
SIndexStat stat;
pthread_mutex_t mtx;
};
struct SIndexOpts {
#ifdef USE_LUCENE
void *opts;
#endif
int32_t numOfItermLimit;
int8_t mergeInterval;
#endif
#ifdef USE_INVERTED_INDEX
int32_t cacheSize; // MB
// add cache module later
#endif
};
struct SIndexMultiTermQuery {
......
......@@ -16,13 +16,14 @@
#define __INDEX_CACHE_H__
#include "index.h"
#include "indexInt.h"
#include "tlockfree.h"
#include "tskiplist.h"
// ----------------- row structure in skiplist ---------------------
// ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/
#ifdef __cplusplus
......@@ -40,11 +41,10 @@ IndexCache *indexCacheCreate();
void indexCacheDestroy(void *cache);
int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
uint32_t version, uint64_t uid, int8_t operType);
int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid);
int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result);
//int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __INDEX_TFILE_H__
#define __INDEX_TFILE_H__
#include "index.h"
#include "indexInt.h"
#include "tlockfree.h"
#include "tskiplist.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct IndexTFile {
T_REF_DECLARE()
} IndexTFile;
IndexTFile *indexTFileCreate();
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
#ifdef __cplusplus
}
#endif
#endif
......@@ -16,12 +16,19 @@
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_tfile.h"
#include "tdef.h"
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
#endif
static int uidCompare(const void *a, const void *b) {
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
if (u1 == u2) { return 0; }
else { return u1 < u2 ? -1 : 1; }
}
typedef struct SIdxColInfo {
int colId; // generated by index internal
int cVersion;
......@@ -30,13 +37,13 @@ typedef struct SIdxColInfo {
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit();
static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result);
static int indexMergeCacheIntoTindex(SIndex *sIdx);
static void indexInterResultsDestroy(SArray *results);
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult);
int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
pthread_once(&isInit, indexInit);
SIndex *sIdx = calloc(1, sizeof(SIndex));
......@@ -49,8 +56,8 @@ int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
sIdx->cache = (void*)indexCacheCreate();
sIdx->tindex = NULL;
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->colId = 1;
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->colId = 1;
sIdx->cVersion = 1;
pthread_mutex_init(&sIdx->mtx, NULL);
......@@ -73,7 +80,7 @@ void indexClose(SIndex *sIdx) {
return;
}
int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) {
int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) {
#ifdef USE_LUCENE
index_document_t *doc = index_document_create();
......@@ -115,7 +122,7 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) {
assert(fi != NULL);
int32_t colId = fi->colId;
int32_t version = index->cVersion;
int ret = indexCachePut(index->cache, colId, p->colType, p->colVal, p->nColVal, version, uid, p->operType);
int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) {
return ret;
}
......@@ -162,12 +169,25 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
#endif
#ifdef USE_INVERTED_INDEX
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray *interResults = taosArrayInit(4, POINTER_BYTES);
int nQuery = taosArrayGetSize(multiQuerys->query);
for (size_t i = 0; i < nQuery; i++) {
SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i);
SArray *tResult = NULL;
indexTermSearch(index, qTerm, &tResult);
taosArrayPush(interResults, (void *)&tResult);
}
indexMergeFinalResults(interResults, opera, result);
indexInterResultsDestroy(interResults);
#endif
return 1;
}
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
#ifdef USE_INVERTED_INDEX
#endif
......@@ -259,3 +279,75 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) {
void indexInit() {
//do nothing
}
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) {
int32_t version = -1;
int16_t colId = -1;
SIdxColInfo *colInfo = NULL;
SIndexTerm *term = query->term;
const char *colName = term->colName;
int32_t nColName = term->nColName;
pthread_mutex_lock(&sIdx->mtx);
colInfo = taosHashGet(sIdx->colObj, colName, nColName);
if (colInfo == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
colId = colInfo->colId;
version = colInfo->cVersion;
pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
//TODO: iterator mem and tidex
STermValueType s;
if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName);
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
}
}
} else {
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
return -1;
}
return 0;
}
static void indexInterResultsDestroy(SArray *results) {
if (results == NULL) { return; }
size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) {
SArray *p = taosArrayGetP(results, i);
taosArrayDestroy(p);
}
taosArrayDestroy(results);
}
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) {
//refactor, merge interResults into fResults by oType
SArray *first = taosArrayGetP(interResults, 0);
taosArraySort(first, uidCompare);
if (oType == MUST) {
} else if (oType == SHOULD) {
// tag1 condistion || tag2 condition
} else if (oType == NOT) {
// not use currently
}
return 0;
}
static int indexMergeCacheIntoTindex(SIndex *sIdx) {
if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
......@@ -18,6 +18,9 @@
#define MAX_INDEX_KEY_LEN 256// test only, change later
// ref index_cache.h:22
#define CACHE_KEY_LEN(p) (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
static char* getIndexKey(const void *pData) {
return NULL;
}
......@@ -43,7 +46,7 @@ static int32_t compareKey(const void *l, const void *r) {
rp += sizeof(rf);
// compare field type
int16_t lft, rft;
int8_t lft, rft;
memcpy(&lft, lp, sizeof(lft));
memcpy(&rft, rp, sizeof(rft));
lp += sizeof(lft);
......@@ -103,14 +106,13 @@ void indexCacheDestroy(void *cache) {
free(pCache);
}
int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
uint32_t version, uint64_t uid, int8_t operType) {
int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) {
if (cache == NULL) { return -1;}
IndexCache *pCache = cache;
// encode data
int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType);
int32_t total = CACHE_KEY_LEN(term);
char *buf = calloc(1, total);
char *p = buf;
......@@ -118,16 +120,16 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f
memcpy(p, &total, sizeof(total));
p += sizeof(total);
memcpy(p, &fieldId, sizeof(fieldId));
p += sizeof(fieldId);
memcpy(p, &colId, sizeof(colId));
p += sizeof(colId);
memcpy(p, &fieldType, sizeof(fieldType));
p += sizeof(fieldType);
memcpy(p, &term->colType, sizeof(term->colType));
p += sizeof(term->colType);
memcpy(p, &fvLen, sizeof(fvLen));
p += sizeof(fvLen);
memcpy(p, fieldValue, fvLen);
p += fvLen;
memcpy(p, &term->nColVal, sizeof(term->nColVal));
p += sizeof(term->nColVal);
memcpy(p, term->colVal, term->nColVal);
p += term->nColVal;
memcpy(p, &version, sizeof(version));
p += sizeof(version);
......@@ -135,10 +137,11 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f
memcpy(p, &uid, sizeof(uid));
p += sizeof(uid);
memcpy(p, &operType, sizeof(operType));
p += sizeof(operType);
memcpy(p, &term->operType, sizeof(term->operType));
p += sizeof(term->operType);
tSkipListPut(pCache->skiplist, (void *)buf);
return 0;
// encode end
}
......@@ -146,7 +149,25 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t
IndexCache *pCache = cache;
return 0;
}
int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result) {
return 0;
int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) {
if (cache == NULL) { return -1; }
IndexCache *pCache = cache;
SIndexTerm *term = query->term;
EIndexQueryType qtype = query->qType;
int32_t keyLen = CACHE_KEY_LEN(term);
char *buf = calloc(1, keyLen);
if (qtype == QUERY_TERM) {
} else if (qtype == QUERY_PREFIX) {
} else if (qtype == QUERY_SUFFIX) {
} else if (qtype == QUERY_REGEX) {
}
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index_tfile.h"
IndexTFile *indexTFileCreate() {
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
return tfile;
}
void IndexTFileDestroy(IndexTFile *tfile) {
free(tfile);
}
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
IndexTFile *ptfile = (IndexTFile *)tfile;
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册