提交 f99f6ec8 编写于 作者: dengyihao's avatar dengyihao

add flush-helper function

上级 f45119ea
...@@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid, ...@@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid,
int32_t nColVal); int32_t nColVal);
void indexTermDestroy(SIndexTerm* p); void indexTermDestroy(SIndexTerm* p);
/*
* init index
*
*/
int32_t indexInit();
/*
* destory index
*
*/
void indexCleanUp();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -49,7 +49,6 @@ struct SIndex { ...@@ -49,7 +49,6 @@ struct SIndex {
SHashObj* colObj; // < field name, field id> SHashObj* colObj; // < field name, field id>
int64_t suid; // current super table id, -1 is normal table int64_t suid; // current super table id, -1 is normal table
int colId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache int32_t cVersion; // current version allocated to cache
SIndexStat stat; SIndexStat stat;
...@@ -88,41 +87,39 @@ typedef struct SIndexTermQuery { ...@@ -88,41 +87,39 @@ typedef struct SIndexTermQuery {
EIndexQueryType qType; EIndexQueryType qType;
} SIndexTermQuery; } SIndexTermQuery;
#define indexFatal(...) \ typedef struct Iterate {
do { \ void* iter;
if (sDebugFlag & DEBUG_FATAL) { \ int8_t type;
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \ char* colVal;
} \ SArray* val;
} Iterate;
extern void* indexQhandle;
int indexFlushCacheTFile(SIndex* sIdx, void*);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
} while (0) } while (0)
#define indexError(...) \ #define indexError(...) \
do { \ do { \
if (sDebugFlag & DEBUG_ERROR) { \ if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexWarn(...) \ #define indexWarn(...) \
do { \ do { \
if (sDebugFlag & DEBUG_WARN) { \ if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexInfo(...) \ #define indexInfo(...) \
do { \ do { \
if (sDebugFlag & DEBUG_INFO) { \ if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexDebug(...) \ #define indexDebug(...) \
do { \ do { \
if (sDebugFlag & DEBUG_DEBUG) { \ if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexTrace(...) \ #define indexTrace(...) \
do { \ do { \
if (sDebugFlag & DEBUG_TRACE) { \ if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0) } while (0)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,10 +22,8 @@ ...@@ -22,10 +22,8 @@
// ----------------- key structure in skiplist --------------------- // ----------------- key structure in skiplist ---------------------
/* A data row, the format is like below: /* A data row, the format is like below:
* content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->| * content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->|
* |<-- value -->|<--uid -->|<--version--->|<-- itermType -->| * len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
* len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|
* <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->|
*/ */
#ifdef __cplusplus #ifdef __cplusplus
...@@ -34,12 +32,17 @@ extern "C" { ...@@ -34,12 +32,17 @@ extern "C" {
typedef struct IndexCache { typedef struct IndexCache {
T_REF_DECLARE() T_REF_DECLARE()
SSkipList* skiplist; SSkipList *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int32_t nTerm;
int8_t type;
} IndexCache; } IndexCache;
typedef struct CacheTerm { typedef struct CacheTerm {
// key // key
int32_t colId;
int32_t nColVal; int32_t nColVal;
char* colVal; char* colVal;
int32_t version; int32_t version;
...@@ -49,14 +52,18 @@ typedef struct CacheTerm { ...@@ -49,14 +52,18 @@ typedef struct CacheTerm {
SIndexOperOnColumn operaType; SIndexOperOnColumn operaType;
} CacheTerm; } CacheTerm;
// //
IndexCache* indexCacheCreate();
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
void indexCacheDestroy(void* cache); void indexCacheDestroy(void* cache);
int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid); int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst); // int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s); int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s);
void indexCacheRef(IndexCache* cache);
void indexCacheUnRef(IndexCache* cache);
void indexCacheDebug(IndexCache* cache); void indexCacheDebug(IndexCache* cache);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache); ...@@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache);
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key);
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader); void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw); void tfileWriterDestroy(TFileWriter* tw);
......
...@@ -18,11 +18,26 @@ ...@@ -18,11 +18,26 @@
#include "index_cache.h" #include "index_cache.h"
#include "index_tfile.h" #include "index_tfile.h"
#include "tdef.h" #include "tdef.h"
#include "tsched.h"
#ifdef USE_LUCENE #ifdef USE_LUCENE
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4
void* indexQhandle = NULL;
int32_t indexInit() {
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
return indexQhandle == NULL ? -1 : 0;
// do nothing
}
void indexCleanUp() {
taosCleanUpScheduler(indexQhandle);
}
static int uidCompare(const void* a, const void* b) { static int uidCompare(const void* a, const void* b) {
uint64_t u1 = *(uint64_t*)a; uint64_t u1 = *(uint64_t*)a;
uint64_t u2 = *(uint64_t*)b; uint64_t u2 = *(uint64_t*)b;
...@@ -38,16 +53,15 @@ typedef struct SIdxColInfo { ...@@ -38,16 +53,15 @@ typedef struct SIdxColInfo {
} SIdxColInfo; } SIdxColInfo;
static pthread_once_t isInit = PTHREAD_ONCE_INIT; static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit(); // static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static int indexFlushCacheTFile(SIndex* sIdx);
static void indexInterResultsDestroy(SArray* results); static void indexInterResultsDestroy(SArray* results);
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult); static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit); // pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { return -1; } if (sIdx == NULL) { return -1; }
...@@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#endif #endif
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
sIdx->cache = (void*)indexCacheCreate(); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = NULL; sIdx->tindex = indexTFileCreate(path);
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->colId = 1;
sIdx->cVersion = 1; sIdx->cVersion = 1;
pthread_mutex_init(&sIdx->mtx, NULL); pthread_mutex_init(&sIdx->mtx, NULL);
...@@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) { ...@@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) {
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
indexCacheDestroy(sIdx->cache); indexCacheDestroy(sIdx->cache);
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
if (*pCache) { indexCacheUnRef(*pCache); }
iter = taosHashIterate(sIdx->colObj, iter);
}
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
pthread_mutex_destroy(&sIdx->mtx); pthread_mutex_destroy(&sIdx->mtx);
#endif #endif
...@@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
pthread_mutex_lock(&index->mtx); pthread_mutex_lock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
if (fi == NULL) { if (*cache == NULL) {
SIdxColInfo tfi = {.colId = index->colId}; IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
index->cVersion++; taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
index->colId++;
taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi));
} else {
// TODO, del
} }
} }
pthread_mutex_unlock(&index->mtx); pthread_mutex_unlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
assert(fi != NULL);
int32_t colId = fi->colId; assert(*cache != NULL);
int32_t version = index->cVersion; int ret = indexCachePut(*cache, p, uid);
int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) { return ret; } if (ret != 0) { return ret; }
} }
#endif
#endif
return 0; return 0;
} }
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
...@@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) { ...@@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) {
taosArrayDestroy(terms); taosArrayDestroy(terms);
} }
void indexInit() {
// do nothing
}
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
int32_t version = -1;
int16_t colId = -1;
SIdxColInfo* colInfo = NULL;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
const char* colName = term->colName; const char* colName = term->colName;
int32_t nColName = term->nColName; int32_t nColName = term->nColName;
// Get col info
IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
colInfo = taosHashGet(sIdx->colObj, colName, nColName); IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
if (colInfo == NULL) { if (*pCache == NULL) {
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
return -1; return -1;
} }
colId = colInfo->colId; cache = *pCache;
version = colInfo->cVersion;
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t)); *result = taosArrayInit(4, sizeof(uint64_t));
// TODO: iterator mem and tidex // TODO: iterator mem and tidex
STermValueType s; STermValueType s;
if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) { if (0 == indexCacheSearch(cache, query, *result, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName); indexInfo("col: %s already drop by other opera", term->colName);
// coloum already drop by other oper, no need to query tindex // coloum already drop by other oper, no need to query tindex
...@@ -353,9 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType ...@@ -353,9 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
} }
return 0; return 0;
} }
static int indexFlushCacheTFile(SIndex* sIdx) { int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; } if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
tfileReaderUnRef(pReader);
indexCacheUnRef(pCache);
return 0; return 0;
} }
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
#include "index_cache.h" #include "index_cache.h"
#include "index_util.h" #include "index_util.h"
#include "tcompare.h" #include "tcompare.h"
#include "tsched.h"
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define CACH_LIMIT 1000000
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
...@@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) { ...@@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l; CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r; CacheTerm* rt = (CacheTerm*)r;
// compare colId
if (lt->colId != rt->colId) { return lt->colId - rt->colId; }
// compare colVal // compare colVal
int i, j; int i, j;
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
...@@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) { ...@@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) {
return -1; return -1;
} }
// compare version // compare version
return rt->version - lt->version; return rt->version - lt->version;
}
// char* lp = (char*)l; static SSkipList* indexInternalCacheCreate(int8_t type) {
// char* rp = (char*)r; if (type == TSDB_DATA_TYPE_BINARY) {
return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
//// compare col id }
// int16_t lf, rf; // cold id
// memcpy(&lf, lp, sizeof(lf));
// memcpy(&rf, rp, sizeof(rf));
// if (lf != rf) { return lf < rf ? -1 : 1; }
// lp += sizeof(lf);
// rp += sizeof(rf);
//// skip value len
// int32_t lfl, rfl;
// memcpy(&lfl, lp, sizeof(lfl));
// memcpy(&rfl, rp, sizeof(rfl));
// lp += sizeof(lfl);
// rp += sizeof(rfl);
//// compare value
// int32_t i, j;
// for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
// if (lp[i] == rp[j]) {
// continue;
// } else {
// return lp[i] < rp[j] ? -1 : 1;
// }
//}
// if (i < lfl) {
// return 1;
//} else if (j < rfl) {
// return -1;
//}
// lp += lfl;
// rp += rfl;
//// compare version, desc order
// int32_t lv, rv;
// memcpy(&lv, lp, sizeof(lv));
// memcpy(&rv, rp, sizeof(rv));
// if (lv != rv) { return lv < rv ? 1 : -1; }
// return 0;
} }
IndexCache* indexCacheCreate() {
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
IndexCache* cache = calloc(1, sizeof(IndexCache)); IndexCache* cache = calloc(1, sizeof(IndexCache));
if (cache == NULL) { if (cache == NULL) {
indexError("failed to create index cache"); indexError("failed to create index cache");
return NULL; return NULL;
} };
cache->skiplist = cache->mem = indexInternalCacheCreate(type);
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
cache->colName = calloc(1, strlen(colName) + 1);
memcpy(cache->colName, colName, strlen(colName));
cache->type = type;
cache->index = idx;
cache->version = 0;
indexCacheRef(cache);
return cache; return cache;
} }
void indexCacheDebug(IndexCache* cache) { void indexCacheDebug(IndexCache* cache) {
SSkipListIterator* iter = tSkipListCreateIter(cache->skiplist); SSkipListIterator* iter = tSkipListCreateIter(cache->mem);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) { if (ct != NULL) {
// TODO, add more debug info // TODO, add more debug info
indexInfo("{colId:%d, colVal: %s, version: %d} \t", ct->colId, ct->colVal, ct->version); indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
...@@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) { ...@@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) {
void indexCacheDestroy(void* cache) { void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
if (pCache == NULL) { return; } if (pCache == NULL) { return; }
tSkipListDestroy(pCache->skiplist); tSkipListDestroy(pCache->mem);
tSkipListDestroy(pCache->imm);
free(pCache->colName);
free(pCache); free(pCache);
} }
int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
indexFlushCacheTFile(sidx, pCache);
}
int indexCacheSchedToMerge(IndexCache* pCache) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doMergeWork;
schedMsg.ahandle = pCache;
schedMsg.thandle = NULL;
schedMsg.msg = NULL;
taosScheduleTask(indexQhandle, &schedMsg);
}
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { return -1; } if (cache == NULL) { return -1; }
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache);
// encode data // encode data
CacheTerm* ct = calloc(1, sizeof(CacheTerm)); CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (cache == NULL) { return -1; } if (cache == NULL) { return -1; }
// set up key // set up key
ct->colId = colId;
ct->colType = term->colType; ct->colType = term->colType;
ct->nColVal = term->nColVal; ct->nColVal = term->nColVal;
ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1)); ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1));
memcpy(ct->colVal, term->colVal, ct->nColVal); memcpy(ct->colVal, term->colVal, ct->nColVal);
ct->version = version; ct->version = atomic_add_fetch_32(&pCache->version, 1);
// set value
ct->uid = uid; ct->uid = uid;
ct->operaType = term->operType; ct->operaType = term->operType;
tSkipListPut(pCache->skiplist, (char*)ct); tSkipListPut(pCache->mem, (char*)ct);
pCache->nTerm += 1;
if (pCache->nTerm >= CACH_LIMIT) {
pCache->nTerm = 0;
while (pCache->imm != NULL) {
// do nothong
}
pCache->imm = pCache->mem;
pCache->mem = indexInternalCacheCreate(pCache->type);
// sched to merge
// unref cache int bgwork
indexCacheSchedToMerge(pCache);
}
indexCacheUnRef(pCache);
return 0; return 0;
// encode end // encode end
} }
int indexCacheDel(void* cache, int32_t fieldId, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
return 0; return 0;
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { return -1; } if (cache == NULL) { return -1; }
IndexCache* pCache = cache; IndexCache* pCache = cache;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
...@@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t ...@@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
CacheTerm* ct = calloc(1, sizeof(CacheTerm)); CacheTerm* ct = calloc(1, sizeof(CacheTerm));
if (ct == NULL) { return -1; } if (ct == NULL) { return -1; }
ct->colId = colId;
ct->nColVal = term->nColVal; ct->nColVal = term->nColVal;
ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1)); ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1));
memcpy(ct->colVal, term->colVal, ct->nColVal); memcpy(ct->colVal, term->colVal, ct->nColVal);
ct->version = version; ct->version = atomic_load_32(&pCache->version);
char* key = getIndexKey(ct); char* key = getIndexKey(ct);
// TODO handle multi situation later, and refactor // TODO handle multi situation later, and refactor
SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->skiplist, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) { if (node != NULL) {
...@@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t ...@@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
} }
return 0; return 0;
} }
void indexCacheRef(IndexCache* cache) {
int ref = T_REF_INC(cache);
UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
int ref = T_REF_DEC(cache);
if (ref == 0) { indexCacheDestroy(cache); }
}
...@@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer); ...@@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReaderLoadHeader(TFileReader* reader); static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static void tfileReaderRef(TFileReader* reader);
static void tfileReaderUnRef(TFileReader* reader);
static int tfileGetFileList(const char* path, SArray* result); static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result); static int tfileRmExpireFile(SArray* result);
...@@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) ...@@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return; return;
} }
TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* reader = calloc(1, sizeof(TFileReader)); TFileReader* reader = calloc(1, sizeof(TFileReader));
if (reader == NULL) { return NULL; } if (reader == NULL) { return NULL; }
...@@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { ...@@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return 0; return 0;
} }
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
if (tf == NULL) { return NULL; }
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key);
}
static int tfileStrCompare(const void* a, const void* b) { static int tfileStrCompare(const void* a, const void* b) {
int ret = strcmp((char*)a, (char*)b); int ret = strcmp((char*)a, (char*)b);
...@@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
free(buf); free(buf);
return 0; return 0;
} }
static void tfileReaderRef(TFileReader* reader) { void tfileReaderRef(TFileReader* reader) {
int ref = T_REF_INC(reader); int ref = T_REF_INC(reader);
UNUSED(ref); UNUSED(ref);
} }
static void tfileReaderUnRef(TFileReader* reader) { void tfileReaderUnRef(TFileReader* reader) {
int ref = T_REF_DEC(reader); int ref = T_REF_DEC(reader);
if (ref == 0) { tfileReaderDestroy(reader); } if (ref == 0) { tfileReaderDestroy(reader); }
} }
...@@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, ...@@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
return -1; return -1;
} }
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_MEM_TO_BUF(buf, key, suid); // SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char); // SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType); // SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char); // SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
} }
...@@ -471,10 +471,10 @@ class CacheObj { ...@@ -471,10 +471,10 @@ class CacheObj {
public: public:
CacheObj() { CacheObj() {
// TODO // TODO
cache = indexCacheCreate(); cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY);
} }
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
int ret = indexCachePut(cache, term, colId, version, uid); int ret = indexCachePut(cache, term, uid);
if (ret != 0) { if (ret != 0) {
// //
std::cout << "failed to put into cache: " << ret << std::endl; std::cout << "failed to put into cache: " << ret << std::endl;
...@@ -486,7 +486,7 @@ class CacheObj { ...@@ -486,7 +486,7 @@ class CacheObj {
indexCacheDebug(cache); indexCacheDebug(cache);
} }
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
int ret = indexCacheSearch(cache, query, colId, version, result, s); int ret = indexCacheSearch(cache, query, result, s);
if (ret != 0) { if (ret != 0) {
// //
std::cout << "failed to get from cache:" << ret << std::endl; std::cout << "failed to get from cache:" << ret << std::endl;
...@@ -561,7 +561,7 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -561,7 +561,7 @@ TEST_F(IndexCacheEnv, cache_test) {
} }
{ {
std::string colVal("v4"); std::string colVal("v4");
for (size_t i = 0; i < 100; i++) { for (size_t i = 0; i < 10; i++) {
colVal[colVal.size() - 1] = 'a' + i; colVal[colVal.size() - 1] = 'a' + i;
SIndexTerm* term = SIndexTerm* term =
indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
...@@ -578,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -578,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) {
STermValueType valType; STermValueType valType;
coj->Get(&query, colId, 10000, ret, &valType); coj->Get(&query, colId, 10000, ret, &valType);
assert(taosArrayGetSize(ret) == 3); // std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
assert(taosArrayGetSize(ret) == 4);
} }
{ {
std::string colVal("v2"); std::string colVal("v2");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册