提交 7e3e6022 编写于 作者: dengyihao's avatar dengyihao

complete index write/search

上级 f99f6ec8
......@@ -51,6 +51,8 @@ struct SIndex {
int64_t suid; // current super table id, -1 is normal table
int32_t cVersion; // current version allocated to cache
char* path;
SIndexStat stat;
pthread_mutex_t mtx;
};
......@@ -87,12 +89,23 @@ typedef struct SIndexTermQuery {
EIndexQueryType qType;
} SIndexTermQuery;
typedef struct Iterate {
void* iter;
typedef struct Iterate Iterate;
typedef struct IterateValue {
int8_t type;
char* colVal;
SArray* val;
} IterateValue;
typedef struct Iterate {
void* iter;
IterateValue val;
bool (*next)(Iterate* iter);
IterateValue* (*getValue)(Iterate* iter);
} Iterate;
void iterateValueDestroy(IterateValue* iv, bool destroy);
extern void* indexQhandle;
int indexFlushCacheTFile(SIndex* sIdx, void*);
......
......@@ -41,6 +41,7 @@ typedef struct IndexCache {
} IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
typedef struct CacheTerm {
// key
int32_t nColVal;
......@@ -57,6 +58,9 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
void indexCacheDestroy(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache);
void indexCacheIteratorDestroy(Iterate* iiter);
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst);
......@@ -66,6 +70,8 @@ void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache);
void indexCacheDebug(IndexCache* cache);
void indexCacheDestroySkiplist(SSkipList* slt);
#ifdef __cplusplus
}
#endif
......
......@@ -319,6 +319,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback);
FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut);
void fstStreamBuilderDestroy(FstStreamBuilder* b);
// set up bound range
// refator, simple code by marco
......
......@@ -113,6 +113,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArr
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type);
void tfileWriteClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data);
......@@ -123,6 +125,14 @@ IndexTFile* indexTFileCreate(const char* path);
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
Iterate* tfileIteratorCreate(TFileReader* reader);
void tfileIteratorDestroy(Iterate* iterator);
TFileValue* tfileValueCreate(char* val);
int tfileValuePush(TFileValue* tf, uint64_t val);
void tfileValueDestroy(TFileValue* tf);
#ifdef __cplusplus
}
......
......@@ -75,9 +75,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->tindex = indexTFileCreate(path);
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1);
memcpy(sIdx->path, path, strlen(path));
pthread_mutex_init(&sIdx->mtx, NULL);
*index = sIdx;
return 0;
#endif
......@@ -361,14 +364,96 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
return 0;
}
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache;
IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
// handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader);
SArray* result = taosArrayInit(1024, sizeof(void*));
bool cn = cacheIter->next(cacheIter);
bool tn = tfileIter->next(tfileIter);
while (cn == true && tn == true) {
IterateValue* cv = cacheIter->getValue(cacheIter);
IterateValue* tv = tfileIter->getValue(tfileIter);
// dump value
int comp = strcmp(cv->colVal, tv->colVal);
if (comp == 0) {
TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val);
taosArrayAddAll(tfv->tableId, tv->val);
taosArrayPush(result, &tfv);
cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter);
continue;
} else if (comp < 0) {
TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val);
taosArrayPush(result, &tfv);
// copy to final Result;
cn = cacheIter->next(cacheIter);
} else {
TFileValue* tfv = tfileValueCreate(tv->colVal);
taosArrayPush(result, &tfv);
taosArrayAddAll(tfv->tableId, tv->val);
// copy to final result
tn = tfileIter->next(tfileIter);
}
}
while (cn == true) {
IterateValue* cv = cacheIter->getValue(cacheIter);
TFileValue* tfv = tfileValueCreate(cv->colVal);
taosArrayAddAll(tfv->tableId, cv->val);
taosArrayPush(result, &tfv);
cn = cacheIter->next(cacheIter);
}
while (tn == true) {
IterateValue* tv = tfileIter->getValue(tfileIter);
TFileValue* tfv = tfileValueCreate(tv->colVal);
taosArrayAddAll(tfv->tableId, tv->val);
taosArrayPush(result, &tfv);
tn = tfileIter->next(tfileIter);
}
int32_t version = CACHE_VERSION(pCache);
uint8_t colType = pCache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType);
if (tw == NULL) {
indexError("faile to open file to write");
} else {
int ret = tfileWriterPut(tw, result);
if (ret != 0) { indexError("faile to write into tindex "); }
}
// not free later, just put int table cache
SSkipList* timm = (SSkipList*)pCache->imm;
pCache->imm = NULL; // or throw int bg thread
indexCacheDestroySkiplist(timm);
tfileWriteClose(tw);
indexCacheIteratorDestroy(cacheIter);
tfileIteratorDestroy(tfileIter);
tfileReaderUnRef(pReader);
indexCacheUnRef(pCache);
return 0;
}
void iterateValueDestroy(IterateValue* value, bool destroy) {
if (destroy) {
taosArrayDestroy(value->val);
} else {
taosArrayClear(value->val);
}
free(value->colVal);
value->colVal = NULL;
}
......@@ -94,6 +94,16 @@ void indexCacheDebug(IndexCache* cache) {
tSkipListDestroyIter(iter);
}
void indexCacheDestroySkiplist(SSkipList* slt) {
SSkipListIterator* iter = tSkipListCreateIter(slt);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {}
}
tSkipListDestroyIter(iter);
}
void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache;
if (pCache == NULL) { return; }
......@@ -108,6 +118,48 @@ static void doMergeWork(SSchedMsg* msg) {
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; }
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
iv->type = ct->operaType;
iv->colVal = ct->colVal;
taosArrayPush(iv->val, &ct->uid);
}
return next;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
return &iter->val;
}
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
Iterate* iiter = calloc(1, sizeof(Iterate));
if (iiter == NULL) { return NULL; }
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
if (iter == NULL) { return; }
tSkipListDestroyIter(iter->iter);
iterateValueDestroy(&iter->val, true);
free(iter);
}
int indexCacheSchedToMerge(IndexCache* pCache) {
SSchedMsg schedMsg = {0};
......
......@@ -23,6 +23,13 @@
#include "taosdef.h"
#include "tcompare.h"
typedef struct TFileFstIter {
FstStreamBuilder* fb;
StreamWithState* st;
AutomationCtx* ctx;
TFileReader* rdr;
} TFileFstIter;
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
static int tfileStrCompare(const void* a, const void* b);
......@@ -184,6 +191,23 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
return ret;
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024);
TFileHeader tfh = {0};
tfh.suid = suid;
tfh.version = version;
memcpy(tfh.colName, colName, strlen(colName));
tfh.colType = colType;
return tfileWriterCreate(wcx, &tfh);
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
// char pathBuf[128] = {0};
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
......@@ -279,6 +303,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
tw->fb = NULL;
return 0;
}
void tfileWriteClose(TFileWriter* tw) {
if (tw == NULL) { return; }
writerCtxDestroy(tw->ctx);
free(tw);
}
void tfileWriterDestroy(TFileWriter* tw) {
if (tw == NULL) { return; }
......@@ -314,6 +343,71 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return 0;
}
static bool tfileIteratorNext(Iterate* iiter) {
IterateValue* iv = &iiter->val;
iterateValueDestroy(iv, false);
// SArray* tblIds = iv->val;
char* colVal = NULL;
uint64_t offset = 0;
TFileFstIter* tIter = iiter->iter;
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
if (rt == NULL) { return false; }
int32_t sz = 0;
char* ch = (char*)fstSliceData(&rt->data, &sz);
colVal = calloc(1, sz + 1);
memcpy(colVal, ch, sz);
offset = (uint64_t)(rt->out.out);
swsResultDestroy(rt);
// set up iterate value
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
iv->colVal = colVal;
// std::string key(ch, sz);
}
static IterateValue* tifileIterateGetValue(Iterate* iter) {
return &iter->val;
}
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
if (tIter == NULL) { return NULL; }
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
tIter->fb = fstSearch(reader->fst, tIter->ctx);
tIter->st = streamBuilderIntoStream(tIter->fb);
tIter->rdr = reader;
return tIter;
}
Iterate* tfileIteratorCreate(TFileReader* reader) {
Iterate* iter = calloc(1, sizeof(Iterate));
iter->iter = tfileFstIteratorCreate(reader);
if (iter->iter == NULL) { return NULL; }
iter->next = tfileIteratorNext;
iter->getValue = tifileIterateGetValue;
return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
if (iter == NULL) { return; }
IterateValue* iv = &iter->val;
iterateValueDestroy(iv, true);
TFileFstIter* tIter = iter->iter;
streamWithStateDestroy(tIter->st);
fstStreamBuilderDestroy(tIter->fb);
automCtxDestroy(tIter->ctx);
free(iter);
}
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
if (tf == NULL) { return NULL; }
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
......@@ -334,6 +428,23 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
return fn(av->colVal, bv->colVal);
}
TFileValue* tfileValueCreate(char* val) {
TFileValue* tf = calloc(1, sizeof(TFileValue));
if (tf == NULL) { return NULL; }
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
if (tf == NULL) { return -1; }
taosArrayPush(tf->tableId, &val);
return 0;
}
void tfileValueDestroy(TFileValue* tf) {
taosArrayDestroy(tf->tableId);
free(tf);
}
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
int sz = taosArrayGetSize(ids);
SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册