提交 02507ddb 编写于 作者: dengyihao's avatar dengyihao

add update index

上级 c368e596
......@@ -96,8 +96,10 @@ typedef struct SIndexTermQuery {
typedef struct Iterate Iterate;
typedef struct IterateValue {
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
char* colVal;
int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
uint64_t ver; // data ver, tfile data version is 0
char* colVal;
SArray* val;
} IterateValue;
......
......@@ -16,6 +16,7 @@
#define __INDEX_CACHE_H__
#include "indexInt.h"
#include "index_util.h"
#include "tskiplist.h"
// ----------------- key structure in skiplist ---------------------
......@@ -52,8 +53,9 @@ typedef struct CacheTerm {
char* colVal;
int32_t version;
// value
uint64_t uid;
int8_t colType;
uint64_t uid;
int8_t colType;
SIndexOperOnColumn operaType;
} CacheTerm;
//
......@@ -68,7 +70,7 @@ void indexCacheIteratorDestroy(Iterate* iiter);
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s);
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s);
void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache);
......
......@@ -19,6 +19,7 @@
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tlockfree.h"
#ifdef __cplusplus
......@@ -103,7 +104,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
......@@ -118,7 +119,7 @@ int tfileWriterFinish(TFileWriter* tw);
IndexTFile* indexTFileCreate(const char* path);
void indexTFileDestroy(IndexTFile* tfile);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr);
Iterate* tfileIteratorCreate(TFileReader* reader);
void tfileIteratorDestroy(Iterate* iterator);
......
......@@ -47,6 +47,19 @@ extern "C" {
buf += len; \
} while (0)
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
f = true; \
} \
} \
if (f == false) { \
taosArrayPush(dst, &tgt); \
} \
}
/* multi sorted result intersection
* input: [1, 2, 4, 5]
* [2, 3, 4, 5]
......@@ -66,10 +79,32 @@ void iUnion(SArray *interResults, SArray *finalResult);
/* sorted array
* total: [1, 2, 4, 5, 7, 8]
* except: [4, 5]
* return: [1, 2, 7, 8]
* return: [1, 2, 7, 8] saved in total
*/
void iExcept(SArray *total, SArray *except);
int uidCompare(const void *a, const void *b);
// data with ver
typedef struct {
uint32_t ver;
uint64_t data;
} SIdxVerdata;
typedef struct {
SArray *total;
SArray *added;
SArray *deled;
} SIdxTempResult;
SIdxTempResult *sIdxTempResultCreate();
void sIdxTempResultClear(SIdxTempResult *tr);
void sIdxTempResultDestroy(SIdxTempResult *tr);
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr);
#ifdef __cplusplus
}
#endif
......
......@@ -31,18 +31,6 @@
void* indexQhandle = NULL;
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \
f = true; \
} \
} \
if (f == false) { \
taosArrayPush(dst, &tgt); \
} \
}
void indexInit() {
// refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
......@@ -52,23 +40,11 @@ void indexCleanUp() {
taosCleanUpScheduler(indexQhandle);
}
static int uidCompare(const void* a, const void* b) {
// add more version compare
uint64_t u1 = *(uint64_t*)a;
uint64_t u2 = *(uint64_t*)b;
return u1 - u2;
}
typedef struct SIdxColInfo {
int colId; // generated by index internal
int cVersion;
} SIdxColInfo;
typedef struct SIdxTempResult {
SArray* total;
SArray* added;
SArray* deled;
} SIdxTempResult;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
......@@ -255,6 +231,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
#ifdef USE_INVERTED_INDEX
#endif
return 1;
......@@ -363,22 +340,30 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
*result = taosArrayInit(4, sizeof(uint64_t));
// TODO: iterator mem and tidex
STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) {
SIdxTempResult* tr = sIdxTempResultCreate();
if (0 == indexCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
return -1;
goto END;
}
}
} else {
indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
return -1;
goto END;
}
sIdxTempResultMergeTo(*result, tr);
sIdxTempResultDestroy(tr);
return 0;
END:
sIdxTempResultDestroy(tr);
return -1;
}
static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) {
......@@ -413,43 +398,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0;
}
SIdxTempResult* sIdxTempResultCreate() {
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxTempResultClear(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
taosArrayClear(tr->total);
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxTempResultDestroy(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray* arrs = taosArrayInit(2, sizeof(void*));
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, tr->deled);
}
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
int32_t sz = taosArrayGetSize(result);
if (sz > 0) {
......@@ -478,6 +426,7 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal
if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
uint32_t ver = cv->ver;
if (cv->type == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
} else if (cv->type == DEL_VALUE) {
......
......@@ -256,7 +256,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
return 0;
}
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
if (mem == NULL) {
return 0;
}
......@@ -267,28 +267,23 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) {
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
// if (c->operaType == ADD_VALUE) {
//} else if (c->operaType == DEL_VALUE) {
//}
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
if (strcmp(c->colVal, ct->colVal) == 0) {
taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else {
break;
if (qtype == QUERY_TERM) {
if (0 == strcmp(c->colVal, ct->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
}
} else if (c->operaType == DEL_VALUE) {
// table is del, not need
*s = kTypeDeletion;
break;
}
}
}
tSkipListDestroyIter(iter);
return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
if (cache == NULL) {
return 0;
}
......@@ -416,6 +411,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->ver = ct->version;
iv->colVal = tstrdup(ct->colVal);
taosArrayPush(iv->val, &ct->uid);
......
......@@ -184,12 +184,13 @@ void tfileReaderDestroy(TFileReader* reader) {
free(reader);
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term;
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
EIndexQueryType qtype = query->qType;
int ret = -1;
SArray* result = taosArrayInit(16, sizeof(uint64_t));
int ret = -1;
// refactor to callback later
if (qtype == QUERY_TERM) {
uint64_t offset;
......@@ -223,6 +224,10 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
// handle later
}
tfileReaderUnRef(reader);
taosArrayAddAll(tr->total, result);
taosArrayDestroy(result);
return ret;
}
......@@ -248,7 +253,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) {
return NULL;
}
......@@ -380,7 +385,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
free(tfile);
}
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) {
int ret = -1;
if (tfile == NULL) {
return ret;
......@@ -428,6 +433,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
return false;
}
iv->ver = 0;
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
iv->colVal = colVal;
return true;
......@@ -628,7 +634,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
int64_t ts = taosGetTimestampUs();
int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, size: %d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread <= fstSize);
......
......@@ -14,6 +14,7 @@
*/
#include "index_util.h"
#include "index.h"
#include "tcompare.h"
typedef struct MergeIndex {
int idx;
......@@ -135,3 +136,60 @@ void iExcept(SArray *total, SArray *except) {
taosArrayPopTailBatch(total, tsz - vIdx);
}
int uidCompare(const void *a, const void *b) {
// add more version compare
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
return u1 - u2;
}
int verdataCompare(const void *a, const void *b) {
SIdxVerdata *va = (SIdxVerdata *)a;
SIdxVerdata *vb = (SIdxVerdata *)b;
int32_t cmp = compareUint64Val(&va->data, &vb->data);
if (cmp == 0) {
cmp = 0 - compareUint32Val(&va->ver, &vb->data);
return cmp;
}
return cmp;
}
SIdxTempResult *sIdxTempResultCreate() {
SIdxTempResult *tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxTempResultClear(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
taosArrayClear(tr->total);
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxTempResultDestroy(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray *arrs = taosArrayInit(2, sizeof(void *));
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, tr->deled);
}
......@@ -24,6 +24,7 @@
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tskiplist.h"
#include "tutil.h"
using namespace std;
......@@ -393,7 +394,13 @@ class TFileObj {
//
//
}
return tfileReaderSearch(reader_, query, result);
SIdxTempResult* tr = sIdxTempResultCreate();
int ret = tfileReaderSearch(reader_, query, tr);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
return ret;
}
~TFileObj() {
if (writer_) {
......@@ -507,9 +514,13 @@ class CacheObj {
indexCacheDebug(cache);
}
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
int ret = indexCacheSearch(cache, query, result, s);
SIdxTempResult* tr = sIdxTempResultCreate();
int ret = indexCacheSearch(cache, query, tr, s);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
if (ret != 0) {
//
std::cout << "failed to get from cache:" << ret << std::endl;
}
return ret;
......@@ -649,7 +660,7 @@ class IndexObj {
indexInit();
}
int Init(const std::string& dir) {
// taosRemoveDir(dir.c_str());
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
......@@ -658,6 +669,14 @@ class IndexObj {
}
return ret;
}
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
Put(terms, uid);
indexMultiTermDestroy(terms);
}
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
......@@ -730,6 +749,7 @@ class IndexObj {
std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal
<< "\t size:" << taosArrayGetSize(result) << std::endl;
} else {
return -1;
}
int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
......@@ -797,13 +817,9 @@ class IndexObj {
class IndexEnv2 : public ::testing::Test {
protected:
virtual void SetUp() {
index = new IndexObj();
}
virtual void TearDown() {
delete index;
}
IndexObj* index;
virtual void SetUp() { index = new IndexObj(); }
virtual void TearDown() { delete index; }
IndexObj* index;
};
TEST_F(IndexEnv2, testIndexOpen) {
std::string path = "/tmp/test";
......@@ -1042,3 +1058,19 @@ TEST_F(IndexEnv2, testIndex_read_performance4) {
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag10", "Hello"));
}
TEST_F(IndexEnv2, testIndex_del) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
for (int i = 0; i < 100; i++) {
index->PutOneTarge("tag10", "Hello", i);
}
index->Del("tag10", "Hello", 12);
index->Del("tag10", "Hello", 11);
index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
EXPECT_EQ(98, index->SearchOne("tag10", "Hello"));
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
// assert(3 == index->SearchOne("tag10", "Hello"));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册